Apache Hudi与Apache Flink集成( 二 )
这批操作最容易想到的是通过使用时间窗口来实现 , 然而 , 使用窗口 , 在某个窗口没有数据流入时 , 将没有输出数据 , Sink端难以判断同一批数据是否已经处理完 。 因此我们使用flink的检查点机制来攒批 , 每两个barrier之间的数据为一个批次 , 当某个子任务中没有数据时 , mock结果数据凑数 。 这样在Sink端 , 当每个子任务都有结果数据下发时即可认为一批数据已经处理完成 , 可以执行commit 。
DAG如下:
文章插图
- source 接收kafka数据 , 转换成List;
- InstantGeneratorOperator 生成全局唯一的instant.当上一个instant未完成或者当前批次无数据时 , 不创建新的instant;
- KeyBy partitionPath 根据 partitionPath分区 , 避免多个子任务写同一个分区;
- WriteProcessOperator 执行写操作 , 当当前分区无数据时 , 向下游发送空的结果数据凑数;
- CommitSink 接收上游任务的计算结果 , 当收到 parallelism个结果时 , 认为上游子任务全部执行完成 , 执行commit.
5. 实现示例1) HoodieTable
/** * Abstract implementation of a HoodieTable. * * @paramSub type of HoodieRecordPayload * @param Type of inputs * @param Type of keys * @param Type of outputs */public abstract class HoodieTable implements Serializable {protected final HoodieWriteConfig config;protected final HoodieTableMetaClient metaClient;protected final HoodieIndex index;public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata insert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata bulkInsert(HoodieEngineContext context, String instantTime,I records, Option> bulkInsertPartitioner);......}
HoodieTable 是 hudi的核心抽象之一 , 其中定义了表支持的insert,upsert,bulkInsert等操作 。 以 upsert 为例 , 输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.从类注释可以看到 T,I,K,O分别代表了hudi操作的负载数据类型、输入数据类型、主键类型以及输出数据类型 。 这些泛型将贯穿整个抽象层 。
2) HoodieEngineContext
/** * Base class contains the context information needed by the engine at runtime. It will be extended by different * engine implementation if needed. */public abstract class HoodieEngineContext {public abstract List map(List data, SerializableFunction func, int parallelism);public abstract List flatMap(List data, SerializableFunction> func, int parallelism);public abstract void foreach(List data, SerializableConsumer consumer, int parallelism);......}
【Apache Hudi与Apache Flink集成】HoodieEngineContext 扮演了 JavaSparkContext 的角色 , 它不仅能提供所有 JavaSparkContext能提供的信息 , 还封装了 map,flatMap,foreach等诸多方法 , 隐藏了JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach()等方法的具体实现 。
推荐阅读
- FlinkSQL 动态加载 UDF 实现思路
- 万字干货还原美团Flink实时数仓建设
- 网易云音乐基于Flink实时数仓实践
- flink消费kafka的offset与checkpoint
- 唯品会实时平台架构-Flink、Spark、Storm
- Flink的DataSet基本算子总结
- Flink中parallelism并行度和slot槽位的理解
- Flink到底能不能实现exactly-once语义?
- Flink流处理应用在IDEA中的执行流程分析
- 在IDEA中执行Flink应用时如何访问Dashboard?