Kafka实时数据即席查询应用与实践( 三 )


Kafka实时数据即席查询应用与实践

文章插图
具体核心实现步骤如下:
  • 消费内容方Topic实时数据;
  • 生成数据预处理策略;
  • 加载数据;
  • 使用Hive SQL对Kafka数据进行即席分析 。
2.3.1 消费内容方Topic实时数据
编写消费Topic的Flink代码,这里不对Topic中的数据做逻辑处理,在后面统一交给MapReduce来做数据预处理,直接消费并存储到HDFS上 。具体实现代码如下所示:
public class Kafka2Hdfs {public static void main(String[] args) {// 判断参数是否有效if (args.length != 3) {LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");return;}// 初始化Kafka连接地址和HDFS存储地址以及Flink并行度String bootStrapServer = args[0];String hdfsPath = args[1];int parallelism = Integer.parseInt(args[2]);// 实例化一个Flink任务对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.setParallelism(parallelism);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Flink消费Topic中的数据DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));// 实例化一个HDFS存储对象BucketingSink<String> sink = new BucketingSink<>(hdfsPath);// 自定义存储到HDFS上的文件名,用小时和分钟来命名,方便后面算策略sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));// 设置存储HDFS的文件大小和存储文件时间频率sink.setBatchSize(1024 * 1024 * 4);sink.setBatchRolloverInterval(1000 * 30);transction.addSink(sink);env.execute("Kafka2Hdfs");}// 初始化Kafka对象连接信息private static Object configByKafkaServer(String bootStrapServer) {Properties props = new Properties();props.setProperty("bootstrap.servers", bootStrapServer);props.setProperty("group.id", "test_bll_group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return props;}}注意事项:
  • 这里我们把时间窗口设置小一些,每30s做一次Checkpoint,如果该批次的时间窗口没有数据过来,就生成一个文件落地到HDFS上;
  • 另外,我们重写了Bucketer为DateTimeBucketer,逻辑并不复杂,在原有的方法上加一个年-月-日/时-分的文件生成路径,例如在HDFS上的生成路径:xxxx/2022-02-26/00-00 。
具体DateTimeBucketer实现代码如下所示:
public class DateMinuteBucketer implements Bucketer<String> {private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");@Overridepublic Path getBucketPath(Clock clock, Path basePath, String element) {return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));}}2.3.2 生成数据预处理策略
这里,我们需要对落地到HDFS上的文件进行预处理,处理的逻辑是这样的 。比如,现在是2022-02-26 14:00,那么我们需要将当天的13:55,13:56,13:57,13:58,13:59这最近5分钟的数据处理到一起,并加载到Hive的最近5分钟的一个分区里面去 。那么,我们需要生成这样一个逻辑策略集合,用HH-mm作为key,与之最近的5个文件作为value,进行数据预处理合并 。具体实现代码步骤如下:
  • 步骤一:获取小时循环策略;
  • 步骤二:获取分钟循环策略;
  • 步骤三:判断是否为5分钟的倍数;
  • 步骤四:对分钟级别小于10的数字做0补齐(比如9补齐后变成09);
  • 步骤五:对小时级别小于10的数字做0补齐(比如1补齐后变成01);
  • 步骤六:生成时间范围;
  • 步骤七:输出结果 。
其中,主要的逻辑是在生成时间范围的过程中,根据小时和分钟数的不同情况,生成不同的时间范围,并输出结果 。在生成时间范围时,需要注意前导0的处理,以及特殊情况(如小时为0、分钟为0等)的处理 。最后,将生成的时间范围输出即可 。


推荐阅读