FlinkSQL 动态加载 UDF 实现思路( 二 )

修改后 , 我们将 UDF jar 存放到 OSS 中进行管理 。 当 Job 需要依赖某个 UDF 时 , 只需要通过动态加载就可以完成 。 动态加载使用 URLClassLoader 实现 , 使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中 , 并取得 returnSelf 类 。
代码调整后存在的问题
FlinkSQL 动态加载 UDF 实现思路文章插图
运行结果:代码调整后 , 在本地 IDEA 运行程序(即 , 启动了 Mini Cluster集群)是可以成功运行的 。 但是当发布到远程 Flink 集群上时(采用 Flink on K8S,Session Cluster 部署模式) , 会出现找不到 UDF 异常 , 如下:
Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf分析:这是由于 Flink 的部署方式有多种 。 在本地运行的启动的是 MiniCluster , 即 JobManager 和 TaskManager 在同一个JVM 进程中 。 而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程 。
FlinkSQL 动态加载 UDF 实现思路文章插图
在 Session 模式下 , 客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情

  • 获取作业所需的依赖项
  • 通过执行环境分析并取得逻辑计划 , 即StreamGraph→JobGraph
  • 将依赖项和JobGraph上传到集群中
只有在这些都完成之后 , 才会通过env.execute() 方法触发 Flink 运行时真正地开始执行作业 。 所以在本地运行的 Mini Cluster , 因为都处于同一个 JVM 进程 , 客户端运行 main() 方法进行动态加载后将依赖项和 JobGraph 提交给 JobMananger 再由 TaskManager 执行 Job 。
而当在远程集群时 , 客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger , 但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中 , 且没有对自定义 UDF Jar URL 进行分发 , 这会让 TaskMananger 在运行任务时出现 Class Not Found 异常 , 这是因为 TaskMananger 没有进行类加载 , JVM 中没有 returnSelf 类所导致 。
解决 UDF Jar 分发的思路基于以上问题我们查阅了一些相关资料及阅读源码 , 以以下三点为条件
  • 基于采用 Session 模式部署
  • 基于 REST API 提交 Job 而不采用命令行方式
  • 不改动 Flink 源码
分析:官网提供了一个 -C 参数 , 大致用法就是把用户自定义 Jar 放到一个 JobMananger 和 TaskMananger 都能访问到的存储地方 , 然后通过命令行方式启动 Job 时使用 -C 参数 , 后面加上自定义 Jar 的URLs 就可以实现分发 。
FlinkSQL 动态加载 UDF 实现思路文章插图
但是我们平台由于采用 REST API , 而提交 Job 的 API 并没有提供该参数 , 所以在不改变 Flink 源码的前提下进行源码研究 , 最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中 , 也就是曲线救国实现了 -C 的效果 。 在 main 中增加以下代码片段:
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);


推荐阅读