![Kafka实时数据即席查询应用与实践](http://img.jiangsulong.com/230525/161102L08-2.jpg)
文章插图
具体核心实现步骤如下:
- 消费内容方Topic实时数据;
- 生成数据预处理策略;
- 加载数据;
- 使用Hive SQL对Kafka数据进行即席分析 。
编写消费Topic的Flink代码,这里不对Topic中的数据做逻辑处理,在后面统一交给MapReduce来做数据预处理,直接消费并存储到HDFS上 。具体实现代码如下所示:
public class Kafka2Hdfs {public static void main(String[] args) {// 判断参数是否有效if (args.length != 3) {LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");return;}// 初始化Kafka连接地址和HDFS存储地址以及Flink并行度String bootStrapServer = args[0];String hdfsPath = args[1];int parallelism = Integer.parseInt(args[2]);// 实例化一个Flink任务对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.setParallelism(parallelism);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Flink消费Topic中的数据DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));// 实例化一个HDFS存储对象BucketingSink<String> sink = new BucketingSink<>(hdfsPath);// 自定义存储到HDFS上的文件名,用小时和分钟来命名,方便后面算策略sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));// 设置存储HDFS的文件大小和存储文件时间频率sink.setBatchSize(1024 * 1024 * 4);sink.setBatchRolloverInterval(1000 * 30);transction.addSink(sink);env.execute("Kafka2Hdfs");}// 初始化Kafka对象连接信息private static Object configByKafkaServer(String bootStrapServer) {Properties props = new Properties();props.setProperty("bootstrap.servers", bootStrapServer);props.setProperty("group.id", "test_bll_group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}}
注意事项:- 这里我们把时间窗口设置小一些,每30s做一次Checkpoint,如果该批次的时间窗口没有数据过来,就生成一个文件落地到HDFS上;
- 另外,我们重写了Bucketer为DateTimeBucketer,逻辑并不复杂,在原有的方法上加一个年-月-日/时-分的文件生成路径,例如在HDFS上的生成路径:xxxx/2022-02-26/00-00 。
public class DateMinuteBucketer implements Bucketer<String> {private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");@Overridepublic Path getBucketPath(Clock clock, Path basePath, String element) {return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));}}
2.3.2 生成数据预处理策略这里,我们需要对落地到HDFS上的文件进行预处理,处理的逻辑是这样的 。比如,现在是2022-02-26 14:00,那么我们需要将当天的13:55,13:56,13:57,13:58,13:59这最近5分钟的数据处理到一起,并加载到Hive的最近5分钟的一个分区里面去 。那么,我们需要生成这样一个逻辑策略集合,用HH-mm作为key,与之最近的5个文件作为value,进行数据预处理合并 。具体实现代码步骤如下:
- 步骤一:获取小时循环策略;
- 步骤二:获取分钟循环策略;
- 步骤三:判断是否为5分钟的倍数;
- 步骤四:对分钟级别小于10的数字做0补齐(比如9补齐后变成09);
- 步骤五:对小时级别小于10的数字做0补齐(比如1补齐后变成01);
- 步骤六:生成时间范围;
- 步骤七:输出结果 。
推荐阅读
- 用.NET爬虫轻松获取招标网站数据
- 量化交易—Python基础语法与数据结构
- 数据结构与算法 --- “哨兵”思想
- |剑宗,重剑轻剑皆有神器,数据远超全职业,成为版本热门选择!
- |胡胖单飞后,“青春老男孩”数据下滑明显,团队分裂的真相是什么
- 苹果|销毁核心数据,裁员700人!摆了9年架子后,260亿巨头扛不住了
- |郭德纲严重警告!曹云金用数据强势反击:30万人在线,6400万点赞!
- 5个等级的数据分析,哪个最深入?
- 求职|在美国,大数据助力大学生职业选择
- Apache Doris 极速数据湖分析技术细节公开!