Kafka Rebalance 客户端原理

前言

Kafka消费者提供了组的概念,它允许多个 consumer 共同消费一个 topic,而不会造成冲突。Kafka提供了Coordinator服务,负责管理消费组。当有新增的consumer加入组,或者有consumer离开组,都会触发Coordinator的重新平衡操作,Coordinator会将topic的分区重新分配给各个consumer。

Rebalance 流程

新的consumer加入到组的过程如下:

  1. consumer 首先会寻找负责该 consumer grouo 是由哪个节点的 Coordinator 负责
  2. 在获取到节点后,consumer 向 Coordinator 发送加入请求
  3. Coordinator 会为每个 consumer 分配 id 号,并从中选择出 leader 角色
  4. consumer 收到响应后,发现自己被选择为 leader 角色,会执行分区算法,将该topic的分区怎么分配给这个 group 的成员。然后将分配结果发送给Coordinator
  5. 如果是follower角色,那么向Coordinator发送请求获取该自己的分区分配结果。

下面我们会按照这个流程,一步步的详细讲解。

寻找 Coordinator 地址

consumer第一步是需要找到 Coordinator 的地址,才能进行后续的请求。它从Kafka集群中选择出一个负载最轻的节点,并且发出寻找Coordinator地址的请求。

协议格式

请求格式的主要字段:

字段名 字段类型 字段含义
coordinator_key 字符串 group id

响应格式的主要字段:

字段名 字段类型 字段含义
node_id 字符串 coordinator服务所在主机的 id 号
host 字符串 主机地址
port 整数 服务端口号

请求加入组

consumer在连接 Coordinator 之后,会与它进行请求交互。它首先会发送加入组的请求,coordinator会分配 id,并且会从组中选出 leader 角色。leader 角色的选取采用先到先得的方式,因为 leader 还会负责分区分配的算法,还需要将结果发送给 Coordinator ,这个过程会比较耗时,所以为了减少整个 rebalance 的时间,所以选用了第一个加入的 consumer。

协议格式

请求格式的主要字段:

字段名 字段类型 字段含义
group_id 字符串 consumer 所在的 group id
session_timeout 整数 心跳超时时间
rebalance_timeout 整数 rebalance超时时间
group_protocols group_protocol 类型列表 group_protocol 类型列表

group_protocol 数据格式

字段名 字段类型 字段含义
protocol_name 字符串 consumer支持的分配算法的名称
protocol_metadata 字节数组 consumer对于此算法的自定义数据

响应格式的主要字段:

字段名 字段类型 字段含义
error_code 整数 错误码
generation_id 字符串 表示coordinator的数据版本
leader_id 整数 leader角色的 id 号
member_id 整数 该consumer 的 id 号
members member 类型列表 所有consumer的订阅信息

member 数据格式

字段名 字段类型 字段含义
member_id 整数 consumer 的 id 号
member_metadata 字节数组 consumer自定义的数据

leader 执行分配

Coordinator 返回给leader角色的响应中,包含了与分配有关的所有信息,比如分区算法和该 group 的所有成员信息。leader角色收到响应后,会执行分区的分配算法,然后将结果保存到 group_assignment 字段里,发送给Coordinator。

协议格式

请求格式的主要字段:

字段名 字段类型 字段含义
group_id 字符串 consumer 所在的 group id
generation_id 整数 coordinator的数据版本号
member_id 整数 consumer的 id
group_assignment assignment 类型列表 所有consumer的分配结果

assignment 类型格式

字段名 字段类型 字段含义
member_id 整数 consumer的 id
member_assignment 字节数组 consumer的分配结果

响应格式的主要字段:

字段名 字段类型 字段含义
error_code 整数 错误码
member_assignment 字节数组 consumer的分配结果

follower请求分配结果

follower角色同样发送了SyncGroupRequest请求,不过它的groupAssignments字段是空的。Coordinator 会将该consumer的分配结果,返回给它。

心跳线程

consumer 会启动一个心跳线程,定时的向Coordinator发送心跳请求,来通知Coordinator自己还活着。

协议格式

请求格式的主要字段:

字段名 字段类型 字段含义
group_id 字符串 consumer 所在的 group id
generation_id 整数 coordinator的版本号
member_id 整数 consumer的 id

响应格式的主要字段:

字段名 字段类型 字段含义
error_code 整数 错误码

心跳时间

心跳的间隔时间由heartbeat.interval.ms配置项指定,默认为3秒。也就是 consumer 会每隔 3秒,就会发送一次心跳。

当长时间的没有收到心跳响应,consumer 就会认为超时了,它会认为 Coordinator 已经挂掉了,会将连接断开。这个超时由session.timeout.ms配置项指定,默认为10秒。

这里额外提下 poll 超时的问题,kafka 规定两次 poll 的间隔时间必须要小于一定时间,不然会自动的离开 group。这个阈值由max.poll.interval.ms配置项指定,默认为5分钟。后面会讲到如何处理这个问题。

离开消费组

当 consumer 关闭或者超时等原因,会触发它发起离开消费组的请求。

协议格式

请求格式的主要字段:

字段名 字段类型 字段含义
group_id 字符串 consumer 所在的 group id
member_id 整数 consumer的 id

响应格式的主要字段:

字段名 字段类型 字段含义
error_code 整数 错误码

回调函数

上述介绍完整个 Rebalance 的流程,接下来还需要留意下,kafka 给了一些回调接口,供我们更好的处理 Rebalance 过程。我们只需要实现ConsumerRebalanceListener接口,然后调用KafkaConsumer.subscribe函数时传递进去即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface ConsumerRebalanceListener {

    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Collection<TopicPartition> partitions);

    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
}

kafka 在获取到分区结果后,会调用onPartitionsAssigned方法,参数partitions表示它所分配的分区结果。

当 consumer 调用 close 方法或者 unsubscribe 方法,会调用onPartitionsLost方法,参数partitions表示它不在订阅的分区。

当 consumer 触发 balance操作时,会触发onPartitionsRevoked方法,参数partitions表示那些仅仅需要回收的分区,而不是分配的所有分区。

数据版本

我们观察到所有的请求都会携带generation_id参数,这是用来表示逻辑时间的。客户端可能因为没来及和服务端沟通,它的信息会落后。当服务端更新消费组的元数据后,generation_id就会加一。这样客户端和服务端请求时,服务端就能及时的提醒客户端的数据已经过时了,需要重新获取。

线程关系

KafkaConsumer 有两个最重要的成员,

1
2
3
4
5
6
7
8
9
public class KafkaConsumer<K, V> implements Consumer<K, V> {
    
    // 负责rebalance的逻辑处理,还有提交offset
    private final ConsumerCoordinator coordinator;
    
    // 负责拉取消息
    private final Fetcher<K, V> fetcher;

}

我们在使用 KafkaConsumer 时,会循坏的调用 poll 方法。这个方法的原理,其实调用了 coordinator.poll 处理 rebalance 流程,保证了 该 consumer 成功的加入到组。然后调用了 fetcher 来拉去消息,最后处理完消息后,通过 coordinator 来提交 offset。

Kafka Consumer 一共有两个线程,

  1. 主线程,负责创建了KafkaConsumer,处理请求,并且处理消息。也就是我们自己编写的程序。
  2. 心跳线程,负责定期向服务端发送心跳

主线程在调用 KafkaConsumer.poll方法时 ,会检查是否与服务端 GroupCoordinator 的连接是否是好的,是否需要 rebalance 操作。它会保证 consumer 能够成功加入到组里。

心跳线程一个功能是通知服务端,自己还活着。另一方面是即使的获取服务端的状态,如果服务端处于 rebalance 状态,心跳线程会设置比较位。这样主线程在下次调用 poll 方法时,就会负责执行 rebalance 操作。

updatedupdated2023-07-022023-07-02