FlinkSQL 动态加载 UDF 实现思路
导读: 最近在对 Flink 进行平台化 , 基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job 。 尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar , 这样方便提交 。 但我们在开发的过程中想对用户自定义 UDF Jar 进行管理 , 想将 UDF Jar 存储管理在阿里云 OSS, 在 Job 中通过动态加载的方式将 UDF Jar 加载进来 , 取代之前将 UDF 和 Job 打成一个 fat jar 的方式 。 下面将从几点展开讨论:
- 将 UDF 写到 Job 中并打成一个 fat jar 的实现方式
- 动态加载 UDF Jar 代码调整
- 代码调整后存在的问题
- 解决 UDF Jar URL 分发的思路
- Flink 1.11.2
- 部署方式:Flink on Kubernetes
- 部署模式: Session Cluster
public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}
要将该 Job 提交给远程 Flink 集群时 , 我们需要将 Job(包括自定义 UDF) 打成一个 fat Jar 。 但这并不是我们期望的操作 , 由于打成 fat jar 会显得比较臃肿 , 同时不方便管理 UDF Jar, 有些 UDF 具有通用性 , 可复用 。 所以我们希望将自定义的UDF Jar 独立出来保存管理 , 并在 Job 中通过动态加载的方式使用 , 如下图:文章插图
动态加载 UDF Jar 代码调整
- 将 returnSelf 并独立打成一个 UDF Jar 上传到阿里云OSS 。
- 在 Job 的 main() 方法中新增动态加载的代码
public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);// 动态加载String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";loadJar(new URL(funJarPath));bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}//动态加载Jarpublic static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 动态降噪+双设备连接,华为FreeBuds Pro上手评
- 网络比15年前更慢错误更多?开发者加载了100万个网站实测
- 算法萌新如何学好动态规划(3)
- 大神已提取出一加8T的动态壁纸:Android 8.0+设备均可使用
- 关于边缘计算与网络动态加速
- PS5系统更新带来动态调整游戏机的风扇速度特性 以提升散热
- “会员配送费更贵”,美团外卖回应了
- Google Photos丰富Memories功能:新增循环显示图片的动态壁纸功能
- Firefox 83将默认启用Warp更新:大幅提升响应时间和加载速度
- 当“一兆难求”遇上动态频谱共享,爱立信与四川电信的5G实践