Apache Kafka 是一个分布式的消息发布-订阅系统。可以说,任何实时大数据处理工具缺少与 Kafka 整合都是不完整的。本文将介绍如何使用 Spark Streaming 从 Kafka 中接收数据,

这里介绍两种方法

  1. 使用 Receivers 和 Kafka 高层次的 API
  2. 使用 Direct API,这是使用低层次的 KafkaAPI,并没有使用到 Receivers,是 Spark 1.3.0中开始引入的。这两种方法有不同的编程模型,性能特点和语义担保

基于 Receivers 的方法

使用了 Receivers 来接收数据。Receivers 的实现使用到 Kafka 高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在 Spark executors 中,然后由 Spark Streaming 启动的 Job 来处理这些数据。

然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在 Spark Streaming 中使用 WAL 日志,这是在 Spark 1.2.0 才引入的功能,这使得我们可以将接收到的数据保存到WAL 中,所以在失败的时候,我们可以从 WAL 中恢复,而不至于丢失数据。

Direct API

和基于 Receiver 接收数据不一样,这种方式定期地从 Kafka 的 topic+partition 中查询最新的偏移量,再根据定义的偏移量范围在每个 batch 里面处理数据。当作业需要处理的数据来临时,Spark 通过调用 Kafka 的简单消费者 API 读取一定范围的数据。

和基于 Receiver 方式相比,这种方式主要有一些几个优点:

  1. 简化并行

    我们不需要创建多个Kafka 输入流,然后union他们。而使用directStream,Spark Streaming将会创建和 Kafka 分区一样的 RDD 分区个数,而且会从 Kafka 并行地读取数据,也就是说Spark分区将会和Kafka分区有一一对应的关系,这对我们来说很容易理解和使用;

  2. 高效

    第一种实现零数据丢失是通过将数据预先保存在 WAL 中,这将会复制一遍数据,这种方式实际上很不高效,因为这导致了数据被拷贝两次:一次是被 Kafka 复制;另一次是写到WAL中。但是本文介绍的方法因为没有Receiver,从而消除了这个问题,所以不需要WAL日志;

  3. 恰好一次语义(Exactly-once semantics)

    文章中通过使用Kafka高层次的API把偏移量写入Zookeeper中,这是读取Kafka中数据的传统方法。虽然这种方法可以保证零数据丢失,但是还是存在一些情况导致数据会丢失,因为在失败情况下通过Spark Streaming读取偏移量和Zookeeper中存储的偏移量可能不一致。而本文提到的方法是通过Kafka低层次的API,并没有使用到Zookeeper,偏移量仅仅被Spark Streaming保存在Checkpoint中。这就消除了Spark Streaming和Zookeeper中偏移量的不一致,而且可以保证每个记录仅仅被Spark Streaming读取一次,即使是出现故障。

但是本方法唯一的坏处就是没有更新 Zookeeper 中的偏移量,所以基于 Zookeeper 的 Kafka 监控工具将会无法显示消费的状况。然而你可以通过 Spark 提供的 API 手动地将偏移量写入到 Zookeeper 中。

总结

Spark Streaming 和 Kafka 整合是如何保证数据零丢失 ?

当我们正确地部署好 Spark Streaming,我们就可以使用 Spark Streaming 提供的零数据丢失机制。为了体验这个关键的特性,你需要满足以下几个先决条件:

  1. 输入的数据来自可靠的数据源和可靠的接收器
  2. 应用程序的 metadata 被 application 的 driver 持久化了(checkpointed)
  3. 启用了 WAL 特性(Write ahead log)