Kafka到底会不会丢数据?( 三 )

  • kafka 通过「多 Partition (分区)多 Replica(副本)机制」已经可以最大限度保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失 。
  • 3.Consumer 端丢失场景剖析
    接下来我们来看看 Consumer 端消费数据丢失场景,对于不了解 Consumer 的读者们,可以先看看 聊聊 Kafka Consumer 那点事, 我们先来看看消费流程:
    Kafka到底会不会丢数据?

    文章插图

    Kafka到底会不会丢数据?

    文章插图
    • Consumer 拉取数据之前跟 Producer 发送数据一样, 需要通过订阅关系获取到集群元数据, 找到相关 Topic 对应的 Leader Partition 的元数据 。
    • 然后 Consumer 通过 Pull 模式主动的去 Kafka 集群中拉取消息 。
    • 在这个过程中,有个消费者组的概念(不了解的可以看上面链接文章),多个 Consumer 可以组成一个消费者组即 Consumer Group,每个消费者组都有一个Group-Id 。同一个 Consumer Group 中的 Consumer 可以消费同一个 Topic 下不同分区的数据,但是不会出现多个 Consumer 去消费同一个分区的数据 。
    • 拉取到消息后进行业务逻辑处理,待处理完成后,会进行 ACK 确认,即提交 Offset 消费位移进度记录 。
    • 最后 Offset 会被保存到 Kafka Broker 集群中的 __consumer_offsets 这个 Topic 中,且每个 Consumer 保存自己的 Offset 进度 。
    根据上图以及消息消费流程可以得出消费主要分为两个阶段:
    • 获取元数据并从 Kafka Broker 集群拉取数据 。
    • 处理消息,并标记消息已经被消费,提交 Offset 记录 。
    既然 Consumer 拉取后消息最终是要提交 Offset, 那么这里就可能会丢数据的!!!
    • 可能使用的「自动提交 Offset 方式」
    • 拉取消息后「先提交 Offset,后处理消息」,如果此时处理消息的时候异常宕机,由于 Offset 已经提交了, 待 Consumer 重启后,会从之前已提交的 Offset 下一个位置重新开始消费, 之前未处理完成的消息不会被再次处理,对于该 Consumer 来说消息就丢失了 。
    • 拉取消息后「先处理消息,再进行提交 Offset」, 如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况,这里只能业务自己保证幂等性 。
    四、消息丢失解决方案
    上面带你从 Producer、Broker、Consumer 三端剖析了可能丢失数据的场景,下面我们就来看看如何解决才能最大限度保证消息不丢失 。
    1.Producer 端解决方案
    在剖析 Producer 端丢失场景的时候, 我们得出其是通过「异步」方式进行发送的,所以如果此时是使用「发后即焚」的方式发送,即调用 Producer.send(msg) 会立即返回,由于没有回调,可能因网络原因导致 Broker 并没有收到消息,此时就丢失了 。
    因此我们可以从以下几方面进行解决 Producer 端消息丢失问题:
    1)更换调用方式:
    弃用调用发后即焚的方式,使用带回调通知函数的方法进行发送消息,即 Producer.send(msg, callback), 这样一旦发现发送失败, 就可以做针对性处理 。
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
    }
    • 网络抖动导致消息丢失,Producer 端可以进行重试 。
    • 消息大小不合格,可以进行适当调整,符合 Broker 承受范围再发送 。
    通过以上方式可以保证最大限度消息可以发送成功 。
    2)ACK 确认机制:
    该参数代表了对"已提交"消息的定义 。
    需要将 request.required.acks 设置为 -1/ all,-1/all 表示有多少个副本 Broker 全部收到消息,才认为是消息提交成功的标识 。
    针对 acks = -1/ all , 这里有两种非常典型的情况:
    • 数据发送到 Leader Partition, 且所有的 ISR 成员全部同步完数据, 此时,Leader Partition 异常 Crash 掉,那么会选举新的 Leader Partition,数据不会丢失, 如下图所示:

    Kafka到底会不会丢数据?


    推荐阅读