Spark Streaming Kafka 原理

Spark Streaming Kafka 原理

Kafka作为一个消息队列,具有很高的吞吐量,和Spark Streaming结合起来,可以实现高速实时的流处理。Spark Streaming在以前的版本支持两种方式读取Kafka,一种是通过 receiver 读取的方式,另一种是直接读取的方式。基于 receiver 方式的读取因为不太稳定,已经被最新版遗弃了,所以下面只讲直接读取的方式。

工作流程

consumer 消费的 offset,这里会直接存储到 kafka里(__consumer_offset topic)。

  1. driver 首先会从 kafka 获取消费组的 offset,然后生成 KafkaRDD,KafkaRDD里的每个分区对应了 kafka 中一个 topic partition 的一小段数据。
  2. executor 会根据这个信息,从 kafka 中读取对应的数据,注意到这里consumer使用的是 assign 分配模式,它不会去管理 offset。executor 读取到数据后,会进行相应的处理。
  3. driver 收到所有 executor 的处理成功消息后,会向 kafka 提交 offset。

下面可以看看它的示例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// kafka 配置项,这里只列出一些重要项
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("group.id", "my_consumer_group");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

// 创建 kafka stream
JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

// 调用 foreachRDD 来处理
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();  // 在 driver执行
  rdd.mapPartitions( p -> {
      // .... 处理 
  })
  // 等待executor执行完任务,driver 端负责提交offset
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

Kafka数据流生成RDD

Kafka数据输入流由DirectKafkaInputDStream类表示,它会定期生成RDD。DirectKafkaInputDStream每次生成RDD的数据,都是读取kafka对应的topic所有分区,自从上次提交的offset一直到最新的offset到的数据。

DirectKafkaInputDStream继承InputDStream,它的compute方法负责生成RDD

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {

  // 记录上次提交的offset, key为Kafka的topic分区,value为提交的offset
  protected var currentOffsets = Map[TopicPartition, Long]()
  
  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    // 这里调用了latestOffsets方法,实时获取topic partition的最新offset
    // clamp方法增加了限速的功能,计算出限速条件下,允许获取最大数据的offset
    val untilOffsets = clamp(latestOffsets())
    // 为每一个topic partition生成一个OffsetRange
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      // 获取上次提交的offset
      val fo = currentOffsets(tp)
      // 这批数据在kafka中的offset范围,是上次提交的offset到现在kafka中最新的offset
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    
    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
      true)
    // 生成KafkaRDD
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
      getPreferredHosts, useConsumerCache)
    // 更新currentOffsets
    currentOffsets = untilOffsets
    // 这里commitAll会去提交offset,但是DirectKafkaInputDStream是不管理offset,
    // 只有用户主动调用了它的commitAsync方法,才会提交
    commitAll()
    // 返回 rdd
    Some(rdd)
  }
}

KafkaRDD 原理

KafkaRDD 分区原理

DirectKafkaInputDStream定期生成的RDD的类型是KafkaRDD。我们首先看看 KafkaRDD是如何划分分区的,它会根据从初始化时接收的offset信息参数,生成KafkaRDDPartition分区,每个分区对应着Kafka的一个topic partition 的一段数据。这段数据的信息由OffsetRange表示, 它保存了数据的位置。

1
2
3
4
5
final class OffsetRange private(
    val topic: String,   // Kafka的topic名称
    val partition: Int,  // 该topic的partition
    val fromOffset: Long,  // 起始offset
    val untilOffset: Long);  // 截至offset

KafkaRDD继承RDD,它的getPartitions负责生成分区

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private[spark] class KafkaRDD[K, V](
    val offsetRanges: Array[OffsetRange] 
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges {

  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }
}

接下来看看KafkaRDD的分区数据是如何读取的。它使用KafkaRDDIterator遍历数据,而KafkaRDDIterator的原理,是调用CachedKafkaConsumer的get方法获取数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class KafkaRDD[K, V] {
  override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] =   {
    // 向下转型为KafkaRDDPartition
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    if (part.fromOffset == part.untilOffset) {
      Iterator.empty
    } else {
      // 返回迭代器
      new KafkaRDDIterator(part, context)
    }
  }

  class KafkaRDDIterator(
      part: KafkaRDDPartition,
      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {

    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    // 获取kafka消费者的groupId
    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    // 当整个spark streaming任务退出时,会调用closeIfNeeded方法关闭 kafka消费者
    context.addTaskCompletionListener{ context => closeIfNeeded() }
    // 获取 kafka消费者
    val consumer = if (useConsumerCache) {
      // 如果支持 kafka消费者缓存,那么实例化CachedKafkaConsumer
      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
      if (context.attemptNumber >= 1) {
        // 如果此次任务失败过,那么删除以前的 kafka 消费者
        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
      }
      // 返回对应groupId,topic和partition的kafka消费者,如果有缓存则返回缓存的。
      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    } else {
      // 新建对应groupId,topic和partition的kafka消费者
      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    }

    // 关闭kafka消费者
    def closeIfNeeded(): Unit = {
      if (!useConsumerCache && consumer != null) {
        consumer.close
      }
    }
    
    // 只返回offset大于fromOffset,小于untilOffset的数据
    var requestOffset = part.fromOffset
    override def hasNext(): Boolean = requestOffset < part.untilOffset

    override def next(): ConsumerRecord[K, V] = {
      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
      // 调用CachedKafkaConsumer的get方法返回一条数据
      val r = consumer.get(requestOffset, pollTimeout)
      requestOffset += 1
      r
    }
  }  
}

CachedKafkaConsumer原理

CachedKafkaConsumer包含了KafkaConsumer实例,它是Kafka的消费者。CachedKafkaConsumer使用KafkaConsumer的assign模式,这种模式需要客户端自己管理offset。CachedKafkaConsumer是运行在 executor 节点上的,它只负责从kafka中读取指定的一段数据,所以不需要管理offset。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class CachedKafkaConsumer[K, V] private(
  val groupId: String,
  val topic: String,
  val partition: Int,
  val kafkaParams: ju.Map[String, Object]) extends Logging {

  val topicPartition = new TopicPartition(topic, partition)
  // 实例化 KafkaConsumer,使用assign模式。这种模式需要自己维护offset
  protected val consumer = {
    val c = new KafkaConsumer[K, V](kafkaParams)
    val tps = new ju.ArrayList[TopicPartition]()
    tps.add(topicPartition)
    c.assign(tps)
    c
  }

  // 从kafka一次读取的数据是多条的,这里用buffer缓存读取的数据
  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
  // 下一条数据的offset
  protected var nextOffset = -2L

  def close(): Unit = consumer.close()

  // 获取offset对应的数据
  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
    logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
    // 如果要获取数据的offset不等于下一条数据的offset,则调用seek移动KafKaConsumer的位置
    if (offset != nextOffset) {
      logInfo(s"Initial fetch for $groupId $topic $partition $offset")
      // 移动读取位置
      seek(offset)
      // 从kafka获取数据
      poll(timeout)
    }
    // 如果缓存的数据,已经读完,则调用poll从kafka中读取数据
    if (!buffer.hasNext()) { poll(timeout) }
    assert(buffer.hasNext(),
      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
    // 获取buffer的数据
    var record = buffer.next()

    if (record.offset != offset) {
      // 如果从buffer中获取的数据有问题,则需要重新从Kafka中读取数据
      logInfo(s"Buffer miss for $groupId $topic $partition $offset")
      seek(offset)
      poll(timeout)
      assert(buffer.hasNext(),
        s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
      // 从buffer中读取数据
      record = buffer.next()
      // 检测该数据的offset是否等于预期
      assert(record.offset == offset,
        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    }
    // 更新nextOffset为当前offset+1
    nextOffset = offset + 1
    record
  }

  private def seek(offset: Long): Unit = {
    // 调用KafKaConsumer的seek方法移动读取位置
    logDebug(s"Seeking to $topicPartition $offset")
    consumer.seek(topicPartition, offset)
  }

  private def poll(timeout: Long): Unit = {
    // 调用poll方法读取数据
    val p = consumer.poll(timeout)
    // 获取该topic partition的数据
    val r = p.records(topicPartition)
    logDebug(s"Polled ${p.partitions()}  ${r.size}")
    buffer = r.iterator
  }

}

从上面的代码可以看到,CachedKafkaConsumer会按照offset顺序的读取数据,并且offset还必须是连续的。如果Kafka开启了日志压缩功能,就会将相同key的数据压缩成一条,那么这样消息的offset就不会是连续的。这种情况下,spark streaming就会报错。上面的代码是spark 2.2版本的,后面的版本修复了这个问题,参见 pull request,增加了spark.streaming.kafka.allowNonConsecutiveOffsets配置,允许处理这种情况。

updatedupdated2023-07-022023-07-02