揭秘字节跳动解决ClickHouse复杂查询问题的技术方案( 二 )


文章插图
 
02
技术方案
1. 设计思想
基于 ClickHouse 的复杂查询的实现采用分Stage的方式,替换目前 ClickHouse的两阶段执行方式 。类似其他分布式数据库引擎(如 Presto、Impala 等),将一个复杂的Query按照数据交换情况切分成多个Stage,Stage和Stage之间通过 exchange完成数据的交换,单个Stage内不存在数据交换 。Stage间的数据交换主要有以下三种形式:
①按照单(多)个 key 进行 Shuffle(shuffle)
②由1个或者多个节点汇聚到一个节点(我们称为 gather)
③同一份数据复制到多个节点(也称为 broadcast 或者说广播)
按照不同的功能切分不同的模块,设计目标如下:
①各个模块约定好接口,尽量减少彼此的依赖和耦合 。一旦某个模块有变动不会影响别的模块,例如Stage生成逻辑的调整不影响调度的逻辑 。
②模块采用插件的架构,允许模块根据配置灵活支持不同的策略 。
2. 相关术语
 

  • ExchangeNode 在语法树中表示数据交换的节点
  • PlanSegment 单个 Stage 对应的执行的计划片段
  • ExchangeManager 管理数据的 exchange,负责不同 Stage 节点之间的数据交换
  • SegmentScheduler 计划片段调度器,负责下发计划片段给 worker,由 Coordinator 节点调用
  • InterpreterPlanSegment 计划片段执行器,执行一个具体的计划片段
 
揭秘字节跳动解决ClickHouse复杂查询问题的技术方案

文章插图
 
3. 执行流程
①Coordinator 接受复杂查询后,在目前 ClickHouse 语法树的基础上,根据节点类型和数据分布情况插入 Exchange 节点并生成分布式 Plan 。
②Coordinator 根据 Exchange Node 类型,切分分布式 Plan 生成每个 Stage 的执行片段 PlanSegment 。
【揭秘字节跳动解决ClickHouse复杂查询问题的技术方案】③Coordinator 调用 SegmentScheduler 将各阶段的 PlanSegment 发送到 Worker 节点 。
④Worker 节点接受 PlanSegment 通过 InterpreterPlanSegment 完成数据的读取和执行,通过 ExchangeManager 完成数据的交互 。
⑤Coordinator 从最后一轮 Stage 对应节点的 ExchangeManager 读取数据后处理后返回给 client 。
4. Plan切分
下面是一个Plan切分的例子,这是1个2表Join的查询场景,根据Exchange信息,将整个分布式 Plan切分成4个Stage 。
揭秘字节跳动解决ClickHouse复杂查询问题的技术方案

文章插图
 
5. 查询片段调度器(SegmentScheduler)
查询片段调度器SegmentScheduler 根据上下游依赖关系和数据分布,以及 Stage 并行度和worker 分布和状态信息,按照一定的调度策略,将 PlanSemgent 发给不同的 Worker 节点 。
揭秘字节跳动解决ClickHouse复杂查询问题的技术方案

文章插图
 
目前支持的2种策略是:
①依赖调度:根据 Stage 依赖关系定义拓扑结构,产生 DAG 图,根据 DAG 图调度 stage,类似于拓扑排序,等到依赖的 Stage 启动后再启动新的 Stage 。例如刚才的两表 join,会先调度左右表读取 stage,再调度 join stage 。
②AllAtOnce:类似于Presto的AllAtOnce策略,会先计算每一个 Stage 的相关信息,一次性调度所有的Stage 。
相比而言,这两种策略是在容错、资源使用和延时上做取舍 。
第一种调度策略可以实现更好的容错,由于 ClickHouse 可以有多个副本,当前一个 Stage 部分节点连接失败时可以尝试切换到副本节点,对后续依赖 stage 无感知 。这里指的是读数据的 Stage,我们称为 Source Stage,非 Source Stage 因为没有数据依赖,容错能力会更强,只要保证并行度的节点数即可,甚至极端情况下可以降低 stage 并行度来支持更好的容错 。缺点是调度有依赖,不能完全并行,会增加调度时长,对于一些数据量和计算量小,但是 stage 多的节点调度延时可能会占 SQL 整体时间不小的比例 。我们也做了一些针对性的优化,对于无依赖关系的尽可能支持并行 。
第二种调度策略通过并行可以极大降低调度延时,为防止大量网络 io 线程,我们通过异步化并且控制线程数目;这种策略的缺点是容错性没有依赖调度好,因为每一个 stage 的 worker 在调度前就已经确定,如果有一个 worker 出现连接异常则整个查询会直接失败 。并且可能有一些 Stage 上游数据还没有 Ready 就被调度执行了,需要长时间等数据 。例如 final agg stage,需要等 partial agg 完成后才能拿到数据 。虽然我们做了一些优化,并不会长时间空跑浪费 CPU 资源,但是毕竟也消耗了一部分资源,比如创建了执行的线程 。


推荐阅读