生产者即数据的发布者,该角色将消息发布到 Kafka 的 topic 中。broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。生产者发送的消息,存储到一个partition 中,生产者也可以指定数据存储的 partition。

1.生产者概览

生产者即数据的发布者,该角色将消息发布到 Kafka 的 topic 中。broker 接收到生产者发送的消息后,将该消息追加到当前用于追加数据的 segment 文件中。生产者发送的消息,存储到一个partition 中,生产者也可以指定数据存储的 partition。

1.1.创建生产者

ProductRecord 对象还可以指定键或分区。

在发送 ProductRecord 对象时,生产者要把键和值对象序列化为字节数组,这样才可以在网络上进行传输。接下来,数据被传给分区器。如果在 ProductRecord 对象里指定了分区,分区器直接将指定的分区返回。如果没有指定分区,分区器会根据 ProductRecord 对象的键选择一个分区。如果选定分区以后,生产者就知道向哪个主题和分区发送这条记录。
紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息被发送到相同主题和分区。(有一个独立的线程负责把这些记录批次发送到相应的 broker 上)

1
2
3
4
5
6
7
8
9
/**
* Create a record with no key
*
* @param topic The topic this record should be sent to
* @param value The record contents
*/
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}

服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka ,返回一个 RecordMetaData对象,包含了主题和分区信息,以及记录在分区的偏移量。如果写入失败,则会返回一个错误。生产者收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

2.创建生产者

向 Kafka 写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka 生产者有3个必选的属性

  • bootstrap.servers

    指定 broker 的地址清单

    1
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,data1:9092,data2:9092,data3:9092,data4:9092,data5:9092");
  • key.serializer

    broker 希望接受到的消息的键和值都是字节数组。生产者接口允许使用参数化类型。因此可以把 Java 对象作为键和值发送给 broker(这样代码具有良好的可读性)

  • value.serializer

3.拦截器

3.1.概述

对于生产者而言,拦截器使用户在消息发送前以及 Producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,生产者允许用户指定多个拦截器按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

1
2
3
4
5
/**
* A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
* they are published to the Kafka cluster.
*/
public interface ProducerInterceptor<K, V> extends Configurable
  • onSend(ProducerRecord)

    生产者确保消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

  • onAcknowledgement(RecordMetadata, Exception)

    该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在生产者回调逻辑触发之前。onAcknowledgement运行在生产者的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢生产者的消息发送效率

  • close

    关闭interceptor,主要用于执行一些资源清理工作

3.2. 拦截器实现

实现一个简单的双 interceptor组成的拦截链。

  • 第一个interceptor会在消息发送前将时间戳信息加到消息前面;
  • 第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。

3.2.1. 时间拦截器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TimerInterceptor implements ProducerInterceptor<String, String> {

@Override
public void configure(Map<String, ?> configs) {}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String value = System.currentTimeMillis() + "---" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.key(), value);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
}

3.2.2. 计数拦截器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.Map;

public class CountInterceptor implements ProducerInterceptor<String, String> {

int success;
int error;
@Override
public void configure(Map<String, ?> configs) {}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(metadata != null){
success ++;
}else {
error++;
}
}
@Override
public void close() {
System.out.println("success: " + success);
System.out.println("error: " + error);
}
}

3.2.3. 配置拦截器

1
2
3
4
5
ArrayList<String> interceptors = new ArrayList<>();
interceptors.add("api.interceptor.TimerInterceptor");
interceptors.add("api.interceptor.CountInterceptor");

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

4. 序列化器

创建Kafka 生产者时必须指定序列化器。Kafka 除提供默认的字符串序列化器 org.apache.kafka.common.serialization.StringSerializer,还提供了整形和字节数组序列化器等。

5.分区器

5.1.分区原因

  • 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以存在多个Partition
  • 提高并发。

5.2.分区原则

kafka 中默认分区器为 org.apache.kafka.clients.producer.internals.DefaultPartitioner,其实现了 org.apache.kafka.clients.producer.Partitioner 接口。默认分区原则为:

1
2
3
4
5
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
  1. 指明 partition 的情况下

    直接将指明的值直接作为 partition 的值

  2. 没有指明 partition 值但是有 key

    将 key 的 hash 值 与 topic 的 partition 数 进行取余 得到 partition 值

    1
    2
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    1
    2
    3
    4
    5
    6
    7
    8

    // 将 number 转换为正数。
    // 当 number 为正时,返回原始值。
    // 当 number 为负数时,返回原始值位与0x7fffffff的绝对值之和。 0x7FFFFFFF 的二进制表示就是除了首位是 0,其余都是1,
    // 即最大的整型数 int
    public static int toPositive(int number) {
    return number & 0x7fffffff;
    }
  3. 没有指定 partition 值又没有 key 值

    round-robin 既第一次调用时随即生成一个整数(后面每次调用在这个整数上进行自增),将这个值与 topic 可用的 partition 总数取余,得到 partition 值。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
     int nextValue = nextValue(topic);
    // broker 集群中 topic 主题可以利用的分区数
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
    int part = Utils.toPositive(nextValue) % availablePartitions.size();
    return availablePartitions.get(part).partition();
    } else {
    // no partitions are available, give a non-available partition
    return Utils.toPositive(nextValue) % numPartitions;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    // 如果是第一次分区,随机生成一个数
    if (null == counter) {
    counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
    AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
    if (currentCounter != null) {
    counter = currentCounter;
    }
    }
    // 不是第一次分区, 则++
    return counter.getAndIncrement();
    }

6.原理

6.1.整体架构

在生产者将消息发往 Kafka之前,有可能需要经历拦截器、序列化器和分区器等一系列的作用,随后才真正进入消息发送流程。

6.1.1.消息累加器 RecordAccumulator

整个生产者客户端由两个线程协调运行,这两个线程分别为 主线程Sender线程[发送线程]。

在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器 [RecordAccumulator,也称为消息收集器]。

Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

  1. RecordAccumulator 主要用来缓存消息,以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗提升性能。

  2. RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 的配置,默认值为32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

6.1.2.ProducerBatch

在 RecordAccumulator 的内部为每个分区都维护了一个双端队列

主线程中发送过来的消息都会被追回到 RecordAccumulator 的某个双端队列 [Deque]中,

队列中的内容就是 ProducerBatch,即 Deque

消息写入缓存时,追回到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。

  • 注意

    1. ProducerBatch 不是 ProducerRecord

      ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的ProducerBatch 可以减少网络请求的次数以提升整体的吞量。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。

    2. ProducerBatch 的大小和 batch.size 参数有着密切的关系。

      当一条消息 [ProducerRecord] 流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列,如果没有则创建,再从这个双端队列的尾部获取一个 ProducerBatch[如果没有则创建],查看 ProducerBatch 中是否还可以写入这个 ProdcucerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。

      在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小创建ProducerBatch,这段内存区域不会被复用。

6.1.3. BufferPool

消息在网络上都是以字节[Byte]的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要是用来实现ByteBuffer的复用,以实现缓存的高效利用。

不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其它大小的 ByteBuffer 不会缓存进BufferPool 中,这个特定的大小由 batch.size 参数指定,默认值为64KB,可以适当地调大batch.size 参数以便多缓存一些消息。

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成<Node, List>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProducerRequest。

6.1.4. InFlightRequest

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequest 保存对象的具体形式为Map<NodeId, Deque>

它的主要作用是缓存已经发出去但还没有收到响应的请求[NodeId是一个 String 类型,表示节点的id编号]。

7.总结

Kafka 生产者如何保证不丢失,不重复?

生产者丢数据,即发送的数据根本没有保存到 Broker 端。出现这个情况的原因可能是,网络抖动,导致消息压根就没有发送到 Broker 端;也可能是消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等等。

上面所说比如网络原因导致消息没有成功发送到 broker 端,常见,也并不可怕。可怕的不是没发送成功,而是发送失败了你不做任何处理。

很简单的一个重试配置,基本就可以解决这种网络瞬时抖动问题。

1
props.put("retries", 10);

当然还有很多其他原因导致的,不能只依靠 kafka 的配置来做处理,我们看一下 kafka 发送端的源码,其实人家是提供了两个方法的,通常会出问题的方法是那个简单的 send,没有 callback(回调)。简单的 send发送后不会去管它的结果是否成功,而 callback 能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

因此,一定要使用带有回调通知的 send 方法。

我们知道,broker 一般不会有一个,我们就是要通过多 Broker 达到高可用的效果,所以对于生产者程序来说,也不能简单的认为发送到一台就算成功,如果只满足于一台,那台机器如果损坏了,那消息必然会丢失。设置 acks = all,表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”,这样可以达到高可用的效果。

考虑到 producer,broker,consumer 之间都有可能造成消息重复,所以我们要求接收端需要支持消息去重的功能,最好借助业务消息本身的幂等性来做。其中有些大数据组件,如 hbase,elasticsearch 天然就支持幂等操作。
举例:
在华泰证券中Kafka的幂等性是如何保证的?在接收端,启动专门的消费者拉取 kafka 数据存入 hbase。hbase 的 rowkey 的设计主要包括 SecurityId(股票id)和 timestamp(行情数据时间)。消费线程从 kafka 拉取数据后反序列化,然后批量插入 hbase,只有插入成功后才往 kafka 中持久化 offset。这样的好处是,如果在中间任意一个阶段发生报错,程序恢复后都会从上一次持久化 offset 的位置开始消费数据,而不会造成数据丢失。如果中途有重复消费的数据,则插入 hbase 的 rowkey 是相同的,数据只会覆盖不会重复,最终达到数据一致。

在0.11之前主要是通过下游系统具有幂等性来保证 Exactly Once。但是这样有几个缺陷:

要求下游系统支持幂等操作,限制了Kafka的适用场景

实现门槛相对较高,需要用户对Kafka的工作机制非常了解

对于Kafka Stream而言,Kafka Producer本身就是“下游”系统,能让Producer具有幂等处理特性,那就可以让Kafka Stream在一定程度上支持Exactly once语义。

0.11之后的版本,引入了 Producer ID(PID)Sequence Number 实现 Producer 的幂等语义。

Producer ID:每个新的Producer在初始化的时候会被分配一个唯一的PID

Sequence Number:对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。

Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:

如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer 抛出InvalidSequenceNumber

如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer 抛出 DuplicateSequenceNumber

这种机制很好的解决了数据重复和数据乱序的问题。