架构图
首先给出一张简单的架构图,初步的了解下流程。
客户端创建请求,将它放入到一个队列里。如果客户端采用了同步请求,那么它会等待响应。如果采用了异步请求,它会传递回调函数,立刻返回。
队列采用先进先出的方式,SendThread 线程会从队列中取出请求,通过底层的 socket 发送到服务端。当客户端收到响应后,SendThread 会负责解析响应。如果客户端采用了同步请求,那么它会通知客户端。如果客户端采用异步请求,那么会将响应传递给 EventThread 线程,由它负责执行回调。
Packet
zookeeper 对于请求和响应,它使用 Packet 类封装在一起。zookeeper 还支持异步,所以 Packet 还包含了回调函数。这个类很简单,只是这些成员的集合。
通信线程
SendThread 有两个 Packet 队列,一个是存储等待发送的请求(称作为 outgoingQueue),另一个是已经发送但还没响应的请求(称作为 pendingQueue)。SendThread 从 outgoingQueue 中提取请求,序列化,通过 socket 通信发送出去后,就将这个请求放进 pendingQueue。
SendThread 提供了 readResponse 方法,用于解析响应。根据响应头部的 xid,它有下面三种特殊类型的响应。
xid |
含义 |
-1 |
用于 Watch 通知 |
-2 |
用于 Ping 响应 |
-4 |
用于 认证响应 |
如果是 Watch 通知,会生成 WatchEvent 放入到队列里,由 EventThread 线程单独处理。
在解析完响应后,对于同步请求执行通知操作,对于异步请求,将其放入队列里,由 EventThread 线程单独处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
private void finishPacket(Packet p) {
......
// cb属性代表着回调函数,如果为空则是同步请求,否则为异步请求
if (p.cb == null) {
synchronized (p) {
// 设置 finished 属性
p.finished = true;
// 执行通知
p.notifyAll();
}
} else {
// 设置 finished 属性
p.finished = true;
// 添加到队列里
eventThread.queuePacket(p);
}
}
|
后台线程
EventThread 负责执行回调函数,还包含处理 Watch 事件。EventThread 的原理很简单,只是维护了一个队列,不停的取出任务执行。对于 Watch 事件需要额外说一下,用户注册的 Watch 回调,只能用一次。我们通过 ZKWatchManager 的源码就可以看出原因,它的 materialize 方法负责获取 Watch 回调。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
static class ZKWatchManager implements ClientWatchManager {
// 保存节点数据变化的回调
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
// 保存节点删除或创建的回调
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
// 保存子节点变化的回调
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
// 这里以节点变化的情况为例
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
// 从dataWatches列表删除
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
// 从existWatches列表删除
addTo(existWatches.remove(clientPath), result);
}
break;
.......
// 其余变化情况
}
}
}
|
可以看到 ZKWatchManager 每次获取 Watch 回调时,都是从集合中删除掉。所以用户想持续性的注册 Watch,需要在回调函数中重新注册自身。
Socket 通信
负责与服务端进行 socket 通信的类是 ClientCnxnSocket,它都是异步的方式,由 ClientCnxnSocket 类表示。ClientCnxnSocket 是一个抽象类,有两个子类 ClientCnxnSocketNIO 和 ClientCnxnSocketNetty,分别基于 selector 和 基于 netty 实现。这里主要讲讲 selector 的实现原理。
它的 doTransport 方法负责处理 socket 事件,比如OP_CONNECT,OP_READ,OP_WRITE。
- 对于 OP_CONNECT 事件,表示 socket 连接创建,接下来会发送 ConnectRequest 请求,来获取 sessionId。
- 对于 OP_READ 事件,表示有响应数据,然后解析响应。
- 对于 OP_WRITE 事件,表示 socket 可写,然后从队列中提取请求发送。
创建连接
接下来看看连接创建的过程,zookeeper 支持 SASL 框架(用来认证和安全的)。创建连接分为两个步骤,socket 连接和初始化请求。
socket 的连接由 ClientCnxnSocket 负责,它使用异步连接的方式。当 socket 连接成功后,如果开启 sasl 认证,那么会发送认证请求。最后发送 ConnectRequest 请求,当收到响应后,需要提取出 sessionId。这个 sessionId 由服务端分配,标识此次连接。
客户端请求
ZooKeeper 客户端为每种请求提供了两种阻塞方式,异步和同步。我们以 create 请求为例,分别介绍同步方式和异步方式。
同步请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode) {
// 创建请求
RequestHeader h = new RequestHeader();
.....
// 提交请求并且等待响应
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
return response.getPath();
}
}
public class ClientCnxn {
public ReplyHeader submitRequest(...) {
ReplyHeader r = new ReplyHeader();
// 将请求放入队列中
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
// 使用 packet 实例的监视锁
synchronized (packet) {
// 判断条件是finished属性
while (!packet.finished) {
packet.wait();
}
}
return r;
}
}
|
回顾下 SendThread 在解析完请求,都会调用 finishPacket 方法,这里面会设置 finished 属性为 true,并且执行通知。
异步请求
1
2
3
4
5
6
7
8
9
10
|
public class ZooKeeper implements AutoCloseable {
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx) {
// 创建请求
RequestHeader h = new RequestHeader();
......
// 将请求放入队列中
cnxn.queuePacket(h, r, request, response, cb, clientPath,
serverPath, ctx, null);
}
|
异步请求需要传递回调函数 StringCallback, 和自定义的回调参数 Object。后台 EventThread 线程会执行回调函数。
异常处理
zookeeper 客户端是通过网络与服务端通信的,而网络也是最不稳定的一环,接下来看看 zookeeper 客户端是如何处理网络异常的。
java 在网络异常时会抛出 IOException,通过 ClientCnxnSocket 抽象类的 doTransport 接口声明就可以看出来。
1
2
3
4
|
abstract class ClientCnxnSocket {
abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException;
}
|
doTransport 接口负责处理 socket 的连接,读写事件,在 SendThread 线程中被循环调用。继续看 SendThread 是如何处理这些异常的,下面代码做了部分简化。
这里还要额外说下 zookeeper 集群,从 3.4.0 版本开始,支持只读模式,默认是关闭的。如果开启了只读模式,那么这个节点即使与集群的连接断开,仍能提供读操作。客户端如果连接的节点是只读模式,那么它会试图找到正常模式的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
class SendThread extends ZooKeeperThread {
// 正常模式的服务端地址
private InetSocketAddress rwServerAddress = null;
public void run() {
int to;
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
// 检查连接是否正常,否则可能需要重新建立
if (!clientCnxnSocket.isConnected()) {
// 如果指定了closing,那么表示需要退出循环
if (closing) {
break;
}
if (rwServerAddress != null) {
// 如果发现集群是正常模式
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// 挑选下一个地址进行连接
serverAddress = hostProvider.next(1000);
}
// 连接服务端
startConnect(serverAddress);
}
// 计算是否超时
if (state.isConnected()) {
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
// 如果超时,则抛出异常
if (to <= 0) {
throw new SessionTimeoutException(warnInfo);
}
if (state == States.CONNECTEDREADONLY) {
// 当前是只读模式,那么向其他节点发送ping请求,检查集群是否为正常模式
// 如果是正常模式,那么抛出异常 RWServerFoundException
pingRwServer();
}
// 处理socket事件
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
break;
} else {
// 这里的cleanup方法,首先重置了底层的socket,然后对于队列里的请求都立即响应ConnectionLost 异常
cleanup();
}
}
}
}
}
|
这里简单说下上面代码的流程,首先它最外层是一个循坏,循环里面处理着创建连接,读写通信。首先是从提供的地址列表中,选出一个地址创建连接,然后根据 ConnectRequest 请求的响应,可以判断出该节点是否处于只读模式。如果处于只读模式,那么 state 属性值为 CONNECTEDREADONLY。它会试图调用 pingRwServer 方法向别的节点发送请求,查看是否处于正常模式,如果发现是正常模式,那么会抛出异常,然后调用 cleanup 方法重置 socket 。
如果底层的 socket 通信断开抛出异常,那么同样会被捕获到,触发 cleanup 方法重置 socket,然后从地址列表中挑选出另外一个进行连接。