在定位一些实时数据的Case时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题 。因此,我们需要对处理的这些实时数据进行记录归档并存储 。
文章插图
一、背景Kafka中的实时数据是以Topic的概念进行分类存储,而Topic的数据是有一定时效性的,比如保存24小时、36小时、48小时等 。
二、内容2.1 案例分析这里以i视频和vivo短视频实时数据为例,之前存在这样的协作问题:
数据上游内容方提供实时Topic(存放i视频和vivo短视频相关实时数据),数据侧对实时数据进行逻辑处理后,发送给下游工程去建库实时索引,当任务执行一段时间后,工程侧建索引偶尔会提出数据没有发送过去的Case,前期由于没有对数据做存储,在定位问题的时候会比较麻烦,经常需求查看实时日志,需要花费很长的时间来分析这些Case是出现在哪个环节 。
为了解决这个问题,我们可以将实时Topic中的数据,在发送给其他Topic的时候,添加跟踪机制,进行数据分流,Sink到存储介质(比如HDFS、Hive等) 。这里,我们选择使用Hive来进行存储,主要是查询方便,支持SQL来快速查询 。如下图所示:
文章插图
在实现优化后的方案时,有两种方式可以实现跟踪机制,它们分别是Flink SQL写Hive、Flink DataStream写Hive 。接下来,分别对这两种实现方案进行介绍和实践 。
2.2 方案一:Flink SQL写Hive这种方式比较直接,可以在Flink任务里面直接操作实时Topic数据后,将消费后的数据进行分流跟踪,作为日志记录写入到Hive表中,具体实现步骤如下:
- 构造Hive Catalog;
- 创建Hive表;
- 写入实时数据到Hive表 。
在构造Hive Catalog时,需要初始化Hive的相关信息,部分代码片段如下所示:
// 设置执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); // 构造 Hive Catalog 名称 String name = "video-hive-catalog"; // 初始化数据库名 String defaultDatabase = "comsearch"; // Hive 配置文件路径地址 String hiveConfDir = "/Appcom/hive/conf"; // Hive 版本号 String version = "3.1.2"; // 实例化一个 HiveCatalog 对象 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); // 注册HiveCatalog tEnv.registerCatalog(name, hive); // 设定当前 HiveCatalog tEnv.useCatalog(name); // 设置执行SQL为Hive tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // 使用数据库 tEnv.useDatabase("db1");
在以上代码中,我们首先设置了 Flink 的执行环境和表环境,然后创建了一个 HiveCatalog,并将其注册到表环境中 。2.2.2 创建Hive表
如果Hive表不存在,可以通过在程序中执行建表语句,具体SQL见表语句代码如下所示:
-- 创建表语句 tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(`content_id` string,`status` int)PARTITIONED BY (`dt` string,`h` string,`m` string)stored as ORCTBLPROPERTIES ('auto-compaction'='true','sink.partition-commit.policy.kind'='metastore,success-file','partition.time-extractor.timestamp-pattern'='$dt $h:$m:00')")
在创建Hive表时我们使用了IF NOT EXISTS关键字,如果Hive中该表不存在会自动在Hive上创建,也可以提前在Hive中创建好该表,Flink SQL中就无需再执行建表SQL,因为用了Hive的Catalog,Flink SQL运行时会找到表 。这里,我们设置了auto-compaction属性为true,用来使小文件自动合并,1.12版的新特性,解决了实时写Hive产生的小文件问题 。同时,指定metastore值是专门用于写入Hive的,也需要指定success-file值,这样CheckPoint触发完数据写入磁盘后会创建_SUCCESS文件以及Hive metastore上创建元数据,这样Hive才能够对这些写入的数据可查 。2.2.3 写入实时数据到Hive表
在准备完成2.2.1和2.2.2中的步骤后,接下来就可以在Flink任务中通过SQL来对实时数据进行操作了,具体实现代码片段如下所示:
// 编写业务SQL String insertSql = "insert intoxxx_table SELECT content_id, status, " +" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM xxx_rt"; // 执行 Hive SQL tEnv.executeSql(insertSql); // 执行任务 env.execute();
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 用.NET爬虫轻松获取招标网站数据
- 量化交易—Python基础语法与数据结构
- 数据结构与算法 --- “哨兵”思想
- |剑宗,重剑轻剑皆有神器,数据远超全职业,成为版本热门选择!
- |胡胖单飞后,“青春老男孩”数据下滑明显,团队分裂的真相是什么
- 苹果|销毁核心数据,裁员700人!摆了9年架子后,260亿巨头扛不住了
- |郭德纲严重警告!曹云金用数据强势反击:30万人在线,6400万点赞!
- 5个等级的数据分析,哪个最深入?
- 求职|在美国,大数据助力大学生职业选择
- Apache Doris 极速数据湖分析技术细节公开!