沙龙回顾|ClickHouse 在实时场景的应用和优化

此次分享分为三部分内容 , 第一部分通过讲解推荐和广告业务的两个典型案例 , 穿插介绍字节内部相应的改进 。 第二部分会介绍典型案例中未覆盖到的改进和经验 。 第三部分会提出目前的不足和未来的改进计划 。
(文末附ClickHouse沙龙第四场:《ClickHouse在A/B实验和模型训练的使用》报名方式)
沙龙视频ClickHouse在实时场景的应用和优化
讲师:郭映中字节跳动ClickHouse研发工程师
文字沉淀早期实践 在第一场分享中 , 技术负责人陈星介绍了ClickHouse在字节跳动内部最早支持的两个业务场景 , 用户行为分析平台和敏捷BI平台 。 这两个平台的数据主要由分析师或者数仓同学产出 , 以T+1的离线指标为主 。 考虑到ClickHouse并不支持事务 , 为了保障数据的一致性 , 我们在ClickHouse系统外实现了一套外部事务:
数仓同学一般会在HDFS/Hive准备好原始数据;数据就绪后 , 会执行一个基于Spark的ETL服务 , 将数据切成N份再存回HDFS(必要的话也会做一些数据的预处理);再发起INSERTQuery给ClickHouse集群的每一个shard , 将对应的数据文件从HDFS中直接导入到MergeTree表中 , 需要注意的是 , 这里没有把数据写入分布式表(i.e.Distributedtable)中;每个节点上的MergeTree表写入成功之后 , 会由外部事务校验整个集群的数据是否写入成功:如果部分节点导入失败 , 外部的导入服务会将部分写入的数据回滚并重新执行导入任务 , 直到数据完全导入成功 , 才允许上层的分析平台查询数据 。 也就是说 , 当ClickHouse中仅有不完整的数据时 , 外部的查询服务不会查询当天的数据 。
这里我解释一下直接写入数据的风险:
直接写入的风险用户写入ClickHouse一般有两种选择:分布式表(i.e.Distributed) , MergeTree表:
写入分布式表:数据写入分布式表时 , 它会将数据先放入本地磁盘的缓冲区 , 再异步分发给所有节点上的MergeTree表 。 如果数据在同步给MergeTree里面之前这个节点宕机了 , 数据就可能会丢失;此时如果在失败后再重试 , 数据就可能会写重 。 因而 , 直接将数据写入用分布式表时 , 不太好保证数据准确性的和一致性 。
当然这个分布式表还有其他问题 , 一般来说一个ClickHouse集群会配置多个shard , 每个shard都会建立MergeTree表和对应的分布式表 。 如果直接把数据写入分布式表 , 数据就可能会分发给每个shard 。 假设有N个节点 , 每个节点每秒收到一个INSERTQuery , 分发N次之后 , 一共就是每秒生成NxN个part目录 。 集群shard数越多 , 分发产生的小文件也会越多 , 最后会导致你写入到MergeTree的Part的数会特别多 , 最后会拖垮整个文件的系统 。
写入MergeTree表:直接写入MergeTree表可以解决数据分发的问题 , 但是依然抗不住高频写入 , 如果业务方写入频次控制不好 , 仍然有可能导致ClickHouse后台合并的速度跟不上写入的速度 , 最后会使得文件系统压力过大 。
所以一段时间内 , 我们禁止用户用INSERTQuery把数据直接写入到ClickHouse 。
典型案例-推荐系统 最初 , 公司内部专门的AB实验平台已经提供了T+1的离线实验指标 , 而推荐系统的算法工程师们希望能更快地观察算法模型、或者某个功能的上线效果 , 因此需要一份能够实时反馈的数据作为补充 。 他们大致有如下需求:
研发同学有debug的需求 , 他们不仅需要看聚合指标 , 某些时间还需要查询明细数据 。 推荐系统产生的数据 , 维度和指标多达几百列 , 而且未来可能还会增加 。 每一条数据都命中了若干个实验 , 使用Array存储 , 需要高效地按实验ID过滤数据 。 需要支持一些机器学习和统计相关的指标计算(比如AUC) 。 对于明细数据这个需求:ClickHouse>Druid 。 对于维度、指标多的问题 , 可能经常变动 , 我们可以用Map列的功能 , 很方便支持动态变更的维度和指标 。 按实验ID过滤的需求 , 则可以用Bloomfilter索引 。 AUC之前则已经实现过 。这些需求我们当时刚好都能满足 。
优点:各个组件职责划分清楚、潜在扩展性强
缺点:需要额外资源、写入频次不好控制、难以处理节点故障、维护成本较高
关键是后面两点:由于缺少事务的支持 , 实时导入数据时难以处理节点故障;ClickHouse组技术栈以C++为主 , 维护Flink潜在的成本比较高 。
这样的架构相对于使用了Flink的方案来说更简单一些 , 由于少了一次数据传输 , 整体而言开销会相对小一些 , 对我们来说也算是补齐了ClickHouse的一部分功能(比如Druid也支持直接消费Kafkatopic)缺点就是未来可扩展性会更差一些 , 也略微增加了引擎维护负担 。
数据由推荐系统直接产生 , 写入Kafka 。 这里推荐系统做了相应配合 , 修改KafkaTopic的消息格式适配ClickHouse表的schema 。 敏捷BI平台也适配了一下实时的场景 , 可以支持交互式的查询分析 。 如果实时数据有问题 , 也可以从Hive把数据导入至ClickHouse中 , 不过这种情况不多 。 除此之外 , 业务方还会将1%抽样的离线数据导入过来做一些简单验证 , 1%抽样的数据一般会保存更久的时间 。 改进一:异步构建索引第一做的改进是将辅助索引的构建异步化了:在社区实现中 , 构建一个Part分为三步:(1)解析输入数据生成内存中数据结构的Block;(2)然后切分Block , 并按照表的schema构建columns数据文件;(3)最后扫描根据skipindexschema去构建skipindex文件 。 三个步骤完成之后才会算Part文件构建完毕 。
目前字节内部的ClickHouse并没有使用社区版本的skipindex , 不过也有类似的辅助索引(e.g.BloomFilterIndex,BitmapIndex) 。 构建part的前两步和社区一致 , 我们构建完columns数据之后用户即可正常查询 , 不过此时的part不能启用索引 。 此时 , 再将刚构建好数据的part放入到一个异步索引构建队列中 , 由后台线程构建索引文件 。 这个改进虽然整体的性能开销没有变化 , 但是由于隐藏了索引构建的时间开销 , 整体的写入吞吐量大概能提升20%
目前实现的Kafka表 , 内部默认只会有一个消费者 , 这样会比较浪费资源并且性能达不到性能要求 。 一开始我们可以通过增大消费者的个数来增大消费能力 , 但社区的实现一开始是由一个线程去管理多个的消费者 , 多个的消费者各自解析输入数据并生成的InputStream之后 , 会由一个UnionStream将多个InputStream组合起来 。 这里的UnionStream会有潜在的性能瓶颈 , 多个消费者消费到的数据最后仅能由一个输出线程完成数据构建 , 所以这里没能完全利用上多线程和磁盘的潜力 。
为什么呢?因为一旦节点故障 , 会带来一系列不好处理的问题 。 (1)首先当出现故障节点的时候 , 一般会替换一个新的节点上来 , 新替换的节点为了恢复数据 , 同步会占用非常大的网络和磁盘IO , 这种情况 , 如果原来主备有两个消费者就剩一个 , 此时消费性能会下降很大(超过一倍) , 这对于我们来说是不太能接受的 。 (2)早先ClickHouseKafkaengine对Kafkapartition的动态分配支持不算好 , 很有可能触发重复消费 , 同时也无法支持数据分片 。 因此我们默认使用静态分配 , 而静态分配不太方便主备节点同时消费 。 (3)最重要的一点 , ClickHouse通过分布式表查询ReplicatedMergeTree时 , 会基于logdelay来计算Query到底要路由到哪个节点 。 一旦在主备同时摄入数据的情况下替换了某个节点 , 往往会导致查询结果不准 。
也就是说 , 我们新写入的数据并不会写入到缺少数据的节点 , 对于查询而言 , 由于查询路由机制的原因也不会把Query路由到缺少数据的节点上 , 所以一直能查询到最新的数据 。 这个机制设计其实和分布式表的查询写入是类似的 , 但由于分布表性能和稳定原因不好在线上使用 , 所以我们用这个方式解决了数据完整性的问题 。
小结一下上面说的主备只有一个节点消费的问题
配置两副本情况下的Kafkaengine , 主备仅有一个节点消费 , 另一个节点待机 。
如果有故障节点 , 则自动切换到正常节点消费;如果有新替换的节点无法正常服务 , 也切换到另一个节点;如果不同机房 , 则由离Kafka更近的节点消费 , 减少带宽消耗;否则 , 由类似ReplicatedMergeTree的ZooKeeperLeader决定 。 典型案例-广告投放实时数据 难点一:产生的实时数据由于涉及到较多的时间分区 , 对于Druid来说可能会产生很多segment , 如果写入今天之前的数据它需要执行一些MR的任务去把数据合并在一起 , 然后才能查历史的数据 , 这个情况下可能会导致今天之前的数据查询并不及时 。
难点二:业务数据的维度也非常多 , 这种场景下使用Druid预聚合的效率并不高 。
对比Druid和ClickHouse的特点和性能后 , 我们决定将该系统迁移到ClickHouse+自研敏捷BI 。 最后由于维度比较多 , 并没有采用预聚合的方式 , 而是直接消费明细数据 。
因为业务产生的数据由(1)大量的当天数据和(2)少量的历史数据组成 。 历史数据一般涉及在3个月内 , 3个月外的可以过滤掉 , 但是即便是3个月内的数据 , 在按天分区的情况下 , 也会因为单批次生成的parts太多导致写入性能有一定下降 。 所以我们一开始是把消费的block_size调的非常大 , 当然这样也有缺点 , 虽然整个数据吞吐量会变大 , 但是由于数据落盘之前是没法查到数据的 , 会导致整体延时更大 。
(1)我们选择将Kafka/Buffer/MergeTree三张表结合起来 , 提供的接口更加易用;
(2)把Buffer内置到Kafkaengine内部 , 作为Kafkaengine的选项可以开启/关闭;
(3)最重要的是支持了ReplicatedMergeTree情况下的查询;
(4)Buffertable内部类似pipeline模式处理多个Block 。
对于任何发送到Replica#1的查询 , 数据肯定是完整的;而对于发送到Replica#2的查询则会额外构建一个特殊的查询逻辑 , 从另一个副本的Buffer#1读取数据 。 这样发送到Replica#2的查询 , 获取到数据就是绿框部分也就是Replica#2的MergeTree再加上Replica#1的Buffer , 它的执行效果是等价于发送到Replica#1的查询 。
由于ClickHouse没有事务的支持 , 所以重启服务后再消费时 , 要么会丢失数据{3,6} , 要么会重复消费{1,4,2,5} 。 对于这个问题我们参考了Druid的KIS方案自己管理KafkaOffset,实现单批次消费/写入的原子语义:实现上选择将Offset和Parts数据绑定在一起 , 增强了消费的稳定性 。
实践&经验 这里解释一下框起来的两个部分:
【沙龙回顾|ClickHouse 在实时场景的应用和优化】首先是查询粒度 , 如果大家有听第一场分享就大概知道 , 我们目前对物化视图做了一些改进 , 假如查询粒度选了5分钟或10分钟 , 那消费数据时数据会像Druid一样提前对数据预聚合 。 而查询的时候也会做一些查询改写 , 用来达到类似Druid的效果 , 目的是为了覆盖公司内部一些用Druid的场景 。
为简化用户的使用成本 , 用户也不用挨个填写Table的Schema , 而是从Kafka的数据里直接推断出schema 。
未来展望和计划小结简单来说在字节内部的应用场景主要分为四类:AB实验、业务实时数据、服务的后台日志数据、机器的监控数据 。 KafkaEngine的改进主要以稳定性改进为主 , 同时也做了部分性能上的改进 。 为了方便业务接入 , 我们也提供了配套的平台和接口 , 除了自己运维的平台 , 也和字节内部其他服务做了集成 。 运维层面则增加了systemtable和systemquery等一系列工具来辅助诊断 , 简化操作 。
(1)第一步是实现在ClickHouse上分布式事务 , 以此解决Kafkaengine消费以及INSERTQuery不稳定的问题 。
(2)之后会尝试实现读写分离 , 将消费数据的节点与查询节点隔离;再基于读写分离做消费节点的动态伸缩 。
(3)分开的消费/写入节点做为专门的写入层 , 后续会引入WAL和Buffer解决高频写入的问题 。 如果有必要的话 , 也会在写入层实现类似分布式表分发数据的功能 。
一旦上面的功能实现成熟 , 会考虑重新开放业务直接使用INSERTQuery写入数据 。
沙龙预告ClickHouse在A/B实验和模型训练的使用
时间:2020年8月28日19:00-20:00
讲师:吴健字节跳动ClickHouse研发工程师
报名链接字节跳动技术沙龙第7期:ClickHouse在A/B实验和模型训练的使用
参与方式: 字节跳动技术沙龙字节跳动技术沙龙是由字节跳动技术学院发起 , 字节跳动技术学院、掘金技术社区联合主办的技术交流活动 。
字节跳动技术沙龙邀请来自字节跳动及业内互联网公司的技术专家 , 分享热门技术话题与一线实践经验 , 内容覆盖架构、大数据、前端、测试、运维、算法、系统等技术领域 。
字节跳动技术沙龙旨在为技术领域人才提供一个开放、自由的交流学习平台 , 帮助技术人学习成长 , 不断进阶 。


    推荐阅读