前言:
在做flink实时计算平台的时候,我遇到过这样一个问题,在执行sql任务的时候,我们需要预先创建一些Table,View,甚至是functions。这些其实flink-sql-client已经提供了,但不支持yarn的per-job模式。所以我就弄了一个任务,专门执行ddl的sql,但是这过程中有一个问题,其实这个job不需要借助于yarn的资源,直接本地跑就行了,只要能连接到你的catalog。
MysqlCatalog
Flink官方是没有实现基于Mysql的Catalog的,最新版本的Flink1.11中,虽然有Jdbc的Catalog,但它的实现的本意并不是一个元数据管理,而是把flink的schema映射到数据的对应的表中,从而实现可以直接往表里写数据,显然不是我想要的。
如何实现一个Mysql的Catalog不是本文的重点,后续我会专门写一篇基于Mysql的Catalog的实现的文章,敬请期待。
DDL执行任务
Flink1.11开始已经全面支持 Flink DDL 的SQL了,包括创建catalog,创建 database,创建view等。使用streamTableEnv.executeSql
就能轻松搞定,无需再去自己解析。下面是我用于专门执行ddl的 flink任务的代码
public static void executeSql(String sqlContent) throws Exception {
ParameterTool active = ParameterTool.fromPropertiesFile(LocalStreamingJob.class.getClassLoader().getResourceAsStream("app.properties"));
String activeFile = String.format("app-%s.properties", active.get("profiles.active"));
ParameterTool tool = ParameterTool.fromPropertiesFile(LocalStreamingJob.class.getClassLoader().getResourceAsStream(activeFile));
Configuration conf = tool.getConfiguration();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, settings);
// 注册默认的catalog
MysqlCatalog catalog = new MysqlCatalog(
conf.get(CATALOG_DEFAULT_NAME),
conf.get(CATALOG_DEFAULT_DATABASE),
conf.get(CATALOG_DEFAULT_USER),
conf.get(CATALOG_DEFAULT_PASSWORD),
conf.get(CATALOG_DEFAULT_URL));
streamTableEnv.registerCatalog(conf.get(CATALOG_DEFAULT_NAME), catalog);
//使用默认的catalog,在sql里显示 使用'use catalog xxx'语句,可以覆盖
streamTableEnv.executeSql(String.format("USE CATALOG `%s`", conf.get(CATALOG_DEFAULT_NAME)));
// 使用默认的database,在sql里显示 使用'use xxx'语句,可以覆盖
streamTableEnv.executeSql(String.format("USE `%s`", conf.get(CATALOG_DEFAULT_DATABASE)));
String[] contentArr = StringUtils.split(sqlContent, ";");
List<String> sqls = new ArrayList<>(contentArr.length);
Collections.addAll(sqls, contentArr);
for(String sql : sqls) {
streamTableEnv.executeSql(sql).print();
}
}
如何本地执行
所谓的本地执行就是在线上执行时,无需提交到集群;就像在IDEA里直接运行一样,首先假设你的平台实现了一个提交ddl的rest接口,通过调用这个接口,传入待执行的sql,就能在catalog中创建一张表。那如何能做到?在我阅读Flink-Clients的源代码的时候,我发现ClientUtils里有这样一段代码:
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
// jar包的classloader
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
//当前线程的classloader
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
//把当前的classloader设置成jar包的classloader
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
//最后还原classloader
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
显然,当你需要在一个线程里执行其它classloader里的代码时,只需要设置成代码的classloader,执行完后,再还原classloader就可以了,使用套路就是:
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
.....
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
有了上面的思路,那么本地执行executeSql就完美解决了,代码如下:
public void executeSql(String sqlContent) {
File flinkFile = new File(flinkHome + "/lib");
if(!flinkFile.exists()) {
throw new ServiceException(String.format("file:[%s] not exists", flinkHome + "/lib"));
}
if(!flinkFile.isDirectory()) {
throw new ServiceException(String.format("file:[%s] is not a directory", flinkHome + "/lib"));
}
List<URL> urls = new ArrayList<>();
File[] files = flinkFile.listFiles();
try {
for (File file : files) {
urls.add(file.toURI().toURL());
}
File localEnvFile = new File(sqlJarFile);
urls.add(localEnvFile.toURI().toURL());
} catch (MalformedURLException e) {
throw new ServiceException(e.getMessage());
}
ClassLoader flinkClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
Objects.requireNonNull(flinkClassLoader, "flink classloader can not be null");
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(flinkClassLoader);
//加载远端的类
Class<?> clazz = flinkClassLoader.loadClass("com.shizhengchao.github.local.env.LocalStreamingJob");
//反射调用执行
Method method = clazz.getMethod("executeSql", String.class);
method.invoke(null, sqlContent);
} catch (Exception e) {
if(e instanceof InvocationTargetException) {
throw new ServiceException(e.getCause());
} else {
throw new ServiceException(e.getMessage());
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
最后,数据库里也有对应的ddl的信息了:
小插曲
在这期间出一了一个小问题:我的平台使用了ebean作为我的ORM框架,而我的mysqlcatalo的实现,我最初也是用ebean。这会导致在设置ClassLoader时,两边的Mapper冲突,出现catalog那端的mapper直接把实时平台的mapper覆盖掉了,导致一直报相应的Bean没有注册到EbeanServer。最后,我不得不把Catalog的实现换成了原生JDBC。