Zookeeper Zab 协议

前言

Zookeeper 作为分布式的一致框架,提供了原子性的写,而且好保证了高可用(集群有过半节点正常运行)。这些特点被广泛应用,比如 Hbase 用来存储配置信息,还用来保证主从切换。本篇文章介绍了 zookeeper 的核心算法 zab 协议,zab 协议是由所有节点必须遵守的,它们协同工作实现了分布式的一致性。

集群状态

zab 协议保证了集群在任何时刻,要么只有一个 leader,要么没有 leader。这两种情况称为广播模式和恢复模式:

  1. 处于广播模式的集群,能够正常对外提供服务,客户端可以正常的读写。
  2. 处于恢复模式的集群,不能对外提供服务,这时节点有可能处于选举过程,同步过程。

广播模式比较简单,就是和二阶段提交类型,但是允许不超过半数的服务挂掉。客户端的写入请求都会转发给 leader ,然后 leader 将消息发送给 follower,如果超过半数的follower响应成功,那么就提交 commit 操作,将数据持久化到磁盘。

恢复模式有一点复杂,它会从集群中选举出一个节点作为 leader,这个节点需要具有最新的数据。而且这个 leader 需要被过半节点认同。我们回想下 zookeeper 的写操作需要过半的节点完成才能认为成功,而选举也需要过半的节点都同意,那么两个集合之间必定有交叉,就可以保证选举出来的是有最新数据的节点。

当选举完成后,follower 会向 leader 同步数据。当多数 follower 同步完成后,就进入到广播模式。

启动过程

任何节点启动都遵循着同样的过程,加载数据,选举,同步,处理请求。

加载过程:zookeeper 的节点会在本地磁盘持久化数据,启动时会加载这些本地数据,这些数据都是根据事务 id 顺序存储的,这样就可以根据最大的事务 id 来判断数据的新旧

选举过程:各个节点相同通信,选举出一个共同的 leader 节点。这个 leader 节点的选票必须过半,并且有着最新的数据。

同步过程:当成为 follower 节点后,需要向 leader 节点同步数据,因为 follower 节点的数据可能落后于leader。

处理请求:当同步过程完成后,集群就可以对外提供读写服务了。

下面是节点运行的程序,可以看到节点在不同的时刻,会有着不同的状态。LOOKING 状态表示处于选举阶段,FOLLOWING 状态表示节点变为 follower 角色,而 LEADING 状态表示节点成为 leader 角色。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
    
    @Override
    public void run() {
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                 // 执行选举,根据返回leader地址,设置自身为follower还是leader
                 setCurrentVote(makeLEStrategy().lookForLeader());
                 break;
            case FOLLOWING:
                 try {
                     // 执行follower程序
                     setFollower(makeFollower(logFactory));
                     follower.followLeader();
                 } finally {
                     // 如果follower程序发生异常或退出
                     follower.shutdown();
                     setFollower(null);
                     // 设置状态为LOOKING
                     updateServerState();
                 }
                 break;
            case LEADING:
                 try {
                     // 执行leader程序
                     setLeader(makeLeader(logFactory));
                     leader.lead();
                     setLeader(null);
                 } finally {
                     // 如果leader程序发生异常或退出,设置状态为LOOKING
                     setLeader(null);
                     updateServerState();
                 }
            }
        }
    }
}   

通信组件

zookeeper 集群有着多个节点,节点之间的通信都是通过网络。zookeeper 使用队列存储请求和响应,并且对于每个连接都有单独的读队列和写队列,同时还有单独的读线程负责解析请求,单独的写线程负责发送数据。

选举框架

zookeeper 的选举算法有多种,不过目前主要只使用 FastLeaderElection 算法,其余的已经被废弃了。FastLeaderElection 在通信组件的基础上还做了一层封装,它有两个线程,读线程 WorkerReceiver 和写线程 WorkerSender。读线程负责从通信组件的队列里读取请求,并且解析成选票请求,添加到投票请求队列。写线程负责把要发送的投票添加到通信组件的写队列里。

FastLeaderElection 选举算法

FastLeaderElection 选举会根据对方的状态来处理选票的,如果对方是 leader 或者 follower,那么说明集群中已经存在一个 leader了,这时只需要检测这个 leader 是否被过半节点认同。如果对方是looking 状态,那么需要比较是否处于同一轮选举,还有选票的大小。如果对方的选票比自身大,需要更改自身的选票,并且向其他节点发出通知。

下面先来看看选票包含了哪些重要信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class Vote {
    // 选择哪个节点作为leader
    final private long id;
    // 选定leader节点的最大事务id,越大说明数据越新
    final private long zxid;
    // 该投票属于第几轮选举
    final private long electionEpoch;
    // leader节点的epoch
    final private long peerEpoch;
}

接下来看看选举原理,

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
public class FastLeaderElection implements Election {
    AtomicLong logicalclock = new AtomicLong();  // 选举轮数
    
    // 自身节点的投票信息
    long proposedLeader;  // 哪个节点为leader
    long proposedZxid;    // 选举的leader节点的最大事务id
    long proposedEpoch;   // 选举的leader节点的最大epoch
    
    public Vote lookForLeader() throws InterruptedException {
        // 存储着来自同选举轮数,每个节点对应的投票,投票里包含了选举哪个节点作为leader
        Map<Long, Vote> recvset = new HashMap<Long, Vote>();
        // 存储着来自leader或者follower的投票
        Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
        // 每次选举都会增加选举轮数
        logicalclock.incrementAndGet();
        // 设置自身的投票信息
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        // 发送投票给其余的节点
        sendNotifications();
        
        // 循环,一直到选出leader为止
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            // 从队列里获取其余节点的投票信息
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
            // 根据发送投票的节点的状态,做不同的处理
            switch (n.state) {
            case LOOKING:
                 // 发送节点的状态为LOOKING
                 // 先比较选举轮数,再比较epoch和事务id
                 if (n.electionEpoch > logicalclock.get()) {
                     // 如果自己的选举轮数落后,更新自己的选举轮数
                     logicalclock.set(n.electionEpoch);
                     // 还需要清除旧的投票信息,因为这些都已经过时了
                     recvset.clear();
                     // 比较谁更适合当选leader,如果是别的节点,那么就更新自己的投票信息
                     if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                         updateProposal(n.leader, n.zxid, n.peerEpoch);
                     } else {
                         updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                     }
                     // 需要重新发送投票,因为之前的已经过时了
                     sendNotifications();
                 } else if (n.electionEpoch < logicalclock.get()) {
                     // 如果发送投票的节点,选举轮数落后,那么就不用理睬
                     break;
                 } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                     // 如果别的节点更适合当选leader,那么就更新自己的投票信息,并且重新发送
                     updateProposal(n.leader, n.zxid, n.peerEpoch);
                     sendNotifications();
                 }
                 
                 // 记录投票信息
                 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                
                 // 如果有超过半数的投票一致,那么有可能新的leader被选举出来了
                if (voteSet.hasAllQuorums()) {
                    // 检测队列里的剩余投票请求,并且直到等待finalizeWait时间,仍然没有新的请求
                    // 如果有别的节点更适合当选leader,那么就跳到循环开始,来处理投票信息
                    while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            recvqueue.put(n);
                            break;
                        }
                    }
                    // 如果仍然没有新请求,那么就认为leader选定出来了
                    // 根据比较节点id,判断是follower角色还是leader角色
                    if (n == null) {
                        // 这里会更新该节点的状态为follower或者leader
                        setPeerState(proposedLeader, voteSet);
                        // 返回最终的选票
                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                        return endVote;
                    }
                }
                break;
           
            case FOLLOWING:
            case LEADING:
                if(n.electionEpoch == logicalclock.get()){
                    // 在同一选举轮数中,已经有leader产生并且开始运行了
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                        // 这里会更新该节点的状态为follower或者leader
                        setPeerState(n.leader, voteSet);
                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                        return endVote;
                    }
                }
                // 这里不用区分选举轮数,因为follower和leader角色已经稳定运行了
                outofelection.put(n.sid, new Vote(n.version, n.leader,
                                n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                // 如果多数节点的投票相同,那么检查该leader节点是否处于leader状态
                if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                    // 设置选举轮数
                    logicalclock.set(n.electionEpoch);
                    // 这里会更新该节点的状态为follower或者leader
                    setPeerState(n.leader, voteSet);
                    // 返回最终的选票
                	Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                	return endVote;
                }
           		break;
        }
    }
}
                    
            

我们再来看看 leader 角色或者 follower 角色是如何处理投票请求的。在 WorkerReceiver 线程里,会检查选票请求。如果自身是 leader 或者 follower,会直接返回自身的选票 。如果自身是 LOOKING 状态,那么就会将选票添加到队列里,等待选举算法处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class WorkerReceiver extends ZooKeeperThread {
    
    QuorumCnxManager manager;
    
    public void run() {
    	Message response;
        while (!stop) {
            response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
            Notification n = new Notification();
            
            // 解析响应,生成Notification实例
            ......
            
            if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                // 如果自身节点是LOOKING状态,那么将请求添加到队列里,等待算法处理
                recvqueue.offer(n);
                // ackstate表示发送该响应的节点的状态,如果对方状态也是LOOKING,并且选举轮数落后,
                // 那么将自身的选票返回给它
                if ((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())) {
                    // 获取自身节点的选票
                    Vote v = getVote();
                    // 构建响应
                    ToSend notmsg = new ToSend(
                        ToSend.mType.notification,
                        v.getId(),
                        v.getZxid(),
                        logicalclock.get(),
                        self.getPeerState(),
                        response.sid,
                        v.getPeerEpoch(),
                        qv.toString().getBytes());
                    // 添加到队列,由后台线程发送
                    sendqueue.offer(notmsg);
                }
            } else {
                // 如果自身节点是leader或者follower,那么表明集群中已经有了leader节点了
                Vote current = self.getCurrentVote();
                if (ackstate == QuorumPeer.ServerState.LOOKING) {
                    // 直接返回自身节点的选票
                    ToSend notmsg = new ToSend(
                        ToSend.mType.notification,
                        current.getId(),
                        current.getZxid(),
                        current.getElectionEpoch(),
                        self.getPeerState(),
                        response.sid,
                        current.getPeerEpoch(),
                        qv.toString().getBytes());
                    // 添加到队列,由后台线程发送
                    sendqueue.offer(notmsg);
                }
            }
        }
    }
}          

选举完成后,各个节点的角色都已经确定好了,接下来继续看 leader 和 follower 角色的运行原理。

同步

同步的原理很简单,目的是为了让 follower 和 leader 的数据必须保持一致。通信过程如下:

如果 follower 节点落后太多,则需要全量同步来提高效率,返回 SNAP 响应。

如果落后不是太多,则选择增量同步,返回 DIFF 响应。

处理写请求

leader 节点接收到客户端的写请求,会先向follower角色(至少要保证过半的节点存活)发送 PROPOSAL 请求。

follower节点收到 PROPOSAL 请求后,会将此次请求存在内存中,然后返回 ACK 响应。

leader 如果有超过半数的节点成功响应 ACK,那么就会向 follower 节点发送COMMIT请求。

follower 节点收到 COMMIT请求后,会将内存的请求,持久化到磁盘中,表示数据成功写入。

updatedupdated2023-07-022023-07-02