1.消费者和消费者组

1.1. 概述

Kafka 消费者从属于消费者组。一个消费者组里的消费者订阅的为同一主题。每个消费者接受主题一部分分区的消息。

向消费者组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者在做一些高延迟的操作,如向HDFS或数据库中写入数据,或者使用数据进行一些耗时的计算操作。在这些情况下,单个消费者无法跟上数据的生成速度,此时我们可以增加消费者,以分担负载,每个消费者只处理部分分区上的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量分区,负载增长时可以增加更多的消费者

注意: 一个主题的同一个分区同时只能供同一个消费者组里的一个消费者消费数据,因此不要让消费者的数量超过主题分区的数量,多于的消费者只会被闲置。

1.2. 消费方式

由于 Push 模式 很难适应消费速率不同的消费者,因此消息发送速率是由 broker 决定的。它的目标是尽可能的以最快的速度传递消息,但是这样容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。 所以 kafka 采用 pull 模式,根据消费者的能力以适当的速率消费消息。==

Pull 模式的不足之处是如果 kafka 中没有数据,消费者可能会陷入循环中,一直返回空数据,针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间在返回,这段时长即为 timeout。

1
2
3
4
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(timeout.toMillis(), true);
}

1.3. 特定偏移处消费数据

使用 poll() 开始消费每个分区中最后一个已经发生的偏移量的消息,并继续按顺序处理所有消息。但是,有时我们肯想要以不同的偏移量开始阅读。如果你想从分区的开头读取所有消息,或者你想要一直跳到分区的末尾并开始只消费新消息,那么可以使用有一些专门的API:seekToBeginning(TopicPartition tp)和seekToEnd(TopicPartition tp)。

1.4. 独立消费者

单一的消费者总是需要从主题中的所有分区读取数据,或者从一个主题特定分区读取数据。在这种情况下没有理由需要组或负载均衡,只是订阅特定的主题或分区,偶尔使用消息和提交偏移量。

1.5. 多线程消费者

KafkaProducer是线程安全的,而 KafkaConsumer 是非线程安全的,多线程需要处理好线程同步,多线程的实现方式有多种,这里介绍一种:每个线程各自实例化一个KakfaConsumer对象,这种方式的缺点是:当这些线程属于同一个消费组时,线程的数量受限于分区数,当消费者线程的数量大于分区数时,就有一部分消费线程一直处于空闲状态

1.6. 退出

如果确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。

如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。

ShutdownHook运行在单独的线程里,所以退出循环最安全的方式只能是调用 consumer.wakeup()

1
2
3
4
5
6
7
8
9
10
11
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

调用 consumer.wakeup() 可以退出 poll() ,并抛出 WakeupException异常,或者如果调用consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。不需要处理WakeupException,因为它只是用于跳出循环的一种方式。不过,在退出线程之前调用 consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
try{
while(true) {
ConsumerRecords records = movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() + " -- waiting for data...");
for(ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
for(TopicPartition tp: consumer.assignment()){
System.out.println("Committing offset at position:" + consumer.position(tp));
}
movingAvg.consumer.commitSync();
}
} catch(WakeupException e) {
// ignore for shutdown
} finally{
//在退出之前,确保你已经完全关闭了消费者
consumer.close();
System.out.println("Closed consumer and we are done");
}

2. 分区分配

群组里的消费者共同读取主题的分区,一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。

当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。

3.1. 再均衡

3.1.1. 概述

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡

再均衡为消费者群组带来了高可用伸缩性。

再均衡期间,消费者无法读取消息,造成整个群组一小段时间内的不可用。此外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它还有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
当消费者要加入群组时,会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为群主。群主从协调器那里获取群组的成员列表,列表包含了最近发送过心跳的消费者,并负责给每一个消费者分配分区。
PartitionAssignor 接口的类决定哪些分区被分配给哪些消费者

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。

3.1.2. 条件

  1. 消费者组中新添加消费者读取到原本是其他消费者读取的消息

  2. 消费者关闭或崩溃之后离开群组,原本由他读取的 partition 将由群组里其他消费者读取

  3. 当向一个 Topic 添加新的 partition,会发生 partition 在消费者中的重新分配

3.1.3. 再均衡监听器

在为消费者分配新的partition或者移除旧的partition时,可以通过消费者API执行一些应用程序代码,在使用subscribe()方法时传入一个ConsumerRebalanceListener实例。

ConsumerRebalanceListener需要实现的两个方法

  1. public void onPartitionRevoked(Collection partitions)

    该方法会在再均衡开始之前消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管partition的消费者就知道该从哪里开始读取了。

  2. public void onPartitionAssigned(Collection partitions)

    该方法会在重新分配partition之后消费者开始读取消息之前被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//如果发生再均衡,要在即将失去partition所有权时提交偏移量。
//调用commitSync方法,确保在再均衡发生之前提交偏移量
consumer.commitSync(currentOffsets);
}
}


try{
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, “no matadata”));
}
consumer.commitAsync(currentOffsets, null);
} catch(WakeupException e) {
//忽略异常,正在关闭消费者
} catch (Exception e) {
log.error("unexpected error", e);
} finally {
try{
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}

3.2. 分区分配

概述

在 Kafka 中,存在着三种分区分配策略。一种是 RangeAssignor 分配策略(范围分区),另一种是 RoundRobinAssignor 分配策略(轮询分区)。默认采用 Range 范围分区。

Kafka 提供了消费者客户端参数 partition.assignment.strategy 用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用 RangeAssignor 分配策略。

在这里插入图片描述

  1. RangeAssignor

    RangeAssignor 策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor 策略会将消费组内所有订阅这个 topic 的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

    • topic下的所有有效分区平铺
    • 消费者按照字典排序
    • 分区数除以消费者数,得到 n
    • 分区数对消费者数取余,得到 m
    • 消费者集合中,前 m 个消费者能够分配到 n+1 个分区,而剩余的消费者只能分配到 n 个分区
  2. RoundRobinAssignor

    RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

    • 消费者按照字典排序,例如C0, C1, C2… …,并构造环形迭代器。
    • topic 名称按照字典排序,并得到每个 topic 的所有分区,从而得到所有分区集合。
    • 遍历第2步所有分区集合,同时轮询消费者。
    • 如果轮询到的消费者订阅的topic不包括当前遍历的分区所属topic,则跳过;否则分配给当前消费者,并继续第3步。
  3. StickyAssignor

    Kafka 从 0.11.x 版本开始引入 StickyAssignor 分配策略,它主要有两个目的:分区的分配要尽可能的均匀和分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor 策略的具体实现要比 RangeAssignor 和RoundRobinAssignor 这两种分配策略要复杂很多。

4. 提交和偏移量 Offset

调用 poll() 时,返回由生产者写入 Kafka 但还没有被消费者读取过的记录

Offset 的维护

由于 consumer 在消费过程中可能会出现断电等故障,consumer恢复之后,需要从故障前的位置继续消费,所以 consumer 需要记录自己消费位置,以便故障恢复后继续消费。

Kafka 0.9 版本之前, comsumer 默认将 offset 保存在 Zookeeper中

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 3] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, dubbo, log_dir_event_notification, controller_epoch, kafka-manager, consumers, hive_zookeeper_namespace_hive, latest_producer_id_block, config, hbase]

[zk: localhost:2181(CONNECTED) 16] ls /consumers/console-consumer-37662/offsets/test_kafka
[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]

[zk: localhost:2181(CONNECTED) 19] ls /consumers/console-consumer-37662/offsets/test_kafka/0
[]

从0.9 版本开始, consumer 默认将 offset 保存在 kafka __consumer_offsets主题中。

在这里插入图片描述

2.2. 自动提交偏移量

将enable.auto.commit 设为true,则每过5秒,消费者会自动把从 poll() 方法接受到的最大偏移量提交上去。提交时间由auto.commit.interval.ms控制,默认值为5秒

  • 自动提交偏移量不足

    假设我们使用默认的5秒提交时间间隔,在最近一次提交之后的3秒发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3秒,所以这3秒内到达的消息将会被重复处理。
    **注:可以通过修改提交时间间隔来频繁提交偏移量,减少可能出现重复消息的时间窗,不过这种情况无法完全避免。**

    2.3. 手动提交偏移量

    把 enable.auto.commit 设为 false,让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量最简单也最可靠。

5.总结

Kafka 如何保证消息有序 ?

kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是分区之间的消息是不保证有序的。

两种方案

  1. kafka topic 只设置一个 partition 分区

    kafka 默认保证同一个 partition 分区内的消息是有序的,则可以设置topic只使用一个分区,这样消息就是全局有序,缺点是只能被consumer group里的一个消费者消费,降低了性能,不适用高并发的情况

  2. producer 将消息发送到指定 partition 分区

    既然 kafka 默认保证同一个 partition 分区内的消息是有序的,则 producer 可以在发送消息时可以指定需要保证顺序的几条消息发送到同一个分区,这样消费者消费时,消息就是有序。

Kafka 缺点 ?
  1. 由于是批量发送,数据并非真正的实时;
  2. 对于mqtt协议不支持;
  3. 不支持物联网传感数据直接接入;
  4. 仅支持统一分区内消息有序,无法实现全局消息有序;
  5. 监控不完善,需要安装插件;
  6. 依赖zookeeper进行元数据管理;