生产者,Broker,消费者都是有可能丢数据的

生产端

生产者丢数据,即发送的数据根本没有保存到 Broker 端。出现这个情况的原因可能是,网络抖动,导致消息压根就没有发送到 Broker 端;也可能是消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等等。

上面所说比如网络原因导致消息没有成功发送到 broker 端,常见,也并不可怕。可怕的不是没发送成功,而是发送失败了你不做任何处理。

很简单的一个重试配置,基本就可以解决这种网络瞬时抖动问题。

1
props.put("retries", 10);

当然还有很多其他原因导致的,不能只依靠 kafka 的配置来做处理,我们看一下 kafka 发送端的源码,其实人家是提供了两个方法的,通常会出问题的方法是那个简单的 send,没有 callback(回调)。简单的 send发送后不会去管它的结果是否成功,而 callback 能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

因此,一定要使用带有回调通知的 send 方法。

我们知道,broker 一般不会有一个,我们就是要通过多 Broker 达到高可用的效果,所以对于生产者程序来说,也不能简单的认为发送到一台就算成功,如果只满足于一台,那台机器如果损坏了,那消息必然会丢失。设置 acks = all,表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”,这样可以达到高可用的效果。

Broker 端

数据已经保存在 broker 端,但是数据却丢失了。出现这个的原因可能是,Broker 机器 down 了,当然broker 是高可用的,假如你的消息保存在 N 个 Kafka Broker 上,那么至少有 1 个存活就不会丢。

解决

  1. 消息冗余

    前面我们说到,kafka 是有限度的保证消息不丢失,这里的限度,指至少要有一台 broker 可以正常提供服务。至少一台,这种说法可并不准确,应该说至少一台存储了你消息的的 broker。我们知道分区可以设置副本数,假如你只设置副本为1,只要挂的刚好是你副本的那台,即使你有1000台broker,也无济于事。

    因此,副本的设置尤为重要,一般设置 replication.factor >= 3**,毕竟目前防止消息丢失的主要机制就是冗余**。

    但仅仅设置副本数就有用吗?并不能保证 broker 端一定存储了三个副本呀。假如共有三个broker,发送一条消息的时候,某个 broker 刚好宕机了,即使你配置了replication.factor = 3,也最多只会有2台副本。因此,我们还要确认,至少要被写入到多少个副本才算是“已提交”。

    min.insync.replicas > 1 , 控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。

    说到这,可能会有疑问,上面生产端不是已经配置 acks=all了,和这个参数不是冲突了吗??注意 acks = all 是针对所有副本 Broker 都要接收到消息,假如 ISR中只有1个副本了,acks=all 也就相当于 acks=1 了,引入 min.insync.replicas 的目的就是为了做一个下限的限制,不能只满足于 ISR 全部写入,还要保证ISR 中的写入个数不少于 min.insync.replicas

    对了,请确保 replication.factor > min.insync.replicas。一般设置为replication.factor = min.insync.replicas + 1。如果两者相等,有一个副本挂机,整个分区就无法正常工作了。我们不仅要考虑消息的可靠性,防止消息丢失,更应该考虑可用性问题。

  2. leader 选举

    我们知道kafka中有领导者副本(Leader Replica)和追随者副本(Follower Replica),而follower replica存在的唯一目的就是防止消息丢失,并不参与具体的业务逻辑的交互。只有leader 才参与服务,follower的作用就是充当leader的候补,平时的操作也只有信息同步。ISR也就是这组与leader保持同步的replica集合,我们要保证不丢消息,首先要保证ISR的存活(至少有一个备份存活),那存活的概念是什么呢,不仅需要机器正常,还需要跟上leader的消息进度,当达到一定程度的时候就会认为“非存活”状态。

    假设这么一种场景,有Leader,Follow1,Follow2;其中Follow2落后于Leader太多,因此不在leader副本和follower1副本所在的ISR集合之中。此时Leader,Follow1都宕机了,只剩下Follow2了,Follow2还在,就会进行新的选举,不过在选举之前首先要判断unclean.leader.election.enable参数的值。如果unclean.leader.election.enable参数的值为false,那么就意味着非ISR中的副本不能够参与选举,此时无法进行新的选举,此时整个分区处于不可用状态。如果unclean.leader.election.enable参数的值为true,那么可以从非ISR集合中选举follower副本称为新的leader。如果让非ISR中的Follow2成为Leader会有什么后果呢?

    img

    我们说Follow2已经落后之前的Leader很多,他成为新的Leader后从客户端继续收取消息,此时,原来的leader副本恢复,成为了新的follower副本,准备向新的leader副本同步消息,但是它发现自身的LEO(LEO是Log End Offset的缩写,它表示了当前日志文件中下一条待写入消息的offset)比leader副本的LEO还要大。Kafka中有一个准则,follower副本的LEO是不能够大于leader副本的,所以新的follower副本就需要截断日志至leader副本的LEO处,截断日志,不就丢失了之前的消息吗?即图中所示,丢失了3和4两条数据,并且新的Follow和新Leader之间的消息也不一致。

因此,如果要保证消息不丢失,需设置:

unclean.leader.election.enable=false,但是Kafka的可用性就会降低,具体怎么选择需要读者根据实际的业务逻辑进行权衡,可靠性优先还是可用性优先。从Kafka 0.11.0.0版本开始将此参数从true设置为false,可以看出Kafka的设计者偏向于可靠性。

消费端丢数据

Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。Kafka默认是自动提交位移的,这样可能会有个问题,假如你在pull(拉取)30条数据,处理到第20条时自动提交了offset,但是在处理21条的时候出现了异常,当你再次pull数据时,由于之前是自动提交的offset,所以是从30条之后开始拉取数据,这也就意味着21-30条的数据发生了丢失。

消费端保证不丢数据,最重要就是保证offset的准确性。我们能做的,就是确保消息消费完成再提交。Consumer 端有个参数 ,设置 enable.auto.commit= false, 并且采用手动提交位移的方式。如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。consumer在处理数据的时候失败了,其实可以把这条数据给缓存起来,可以是redis、DB、file等,也可以把这条消息存入专门用于存储失败消息的topic中,让其它的consumer专门处理失败的消息。