Kafka ConsumerNetworkClient 原理

Kafka ConsumerNetworkClient 原理

Kafka消费者会使用ConsumerNetworkClient发送和处理请求。ConsumerNetworkClient是在NetworkClient之外封装了一层,提供了异步的请求方法。它每次发送请求,会返回RequestFuture。RequestFuture实现了类似Future的功能,而且 支持添加事件回调函数。

RequestFuture 原理

RequestFuture实现了类似Future的功能,它使用了CountDownLatch来实现等待和通知的功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
    private static final Object INCOMPLETE_SENTINEL = new Object();
    // result保存了结果
    private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
    // CountDownLatch变量,用来执行等待和通知
    private final CountDownLatch completedLatch = new CountDownLatch(1);
    
    // 等待完成
    public boolean awaitDone(long timeout, TimeUnit unit) throws InterruptedException {
        // 使用了CountDownLatch的await方法实现等待
        return completedLatch.await(timeout, unit);
    }
    
    // 判断是否完成,INCOMPLETE_SENTINEL表示未完成
    public boolean isDone() {
        return result.get() != INCOMPLETE_SENTINEL;
    }
    
    // 获取值
    public T value() {
        if (!succeeded())
            throw new IllegalStateException("Attempt to retrieve value from future which hasn't successfully completed");
        return (T) result.get();
    }    
    
}

管理监听器

RequestFuture在Future基础之上,还添加了监听器的功能,这使得编程回调更加方便。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
    // 保存了监听器的队列
    private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
    
    public void addListener(RequestFutureListener<T> listener) {
        // 将监听器保存到队列里
        this.listeners.add(listener);
        
        if (failed())
            // 如果结果已经完成,并且结果失败,则执行回调函数
            fireFailure();
        else if (succeeded())
            // 如果结果已经完成,并且结果成功,则执行回调函数
            fireSuccess();
    }
    
    private void fireSuccess() {
        // 获取值
        T value = value();
        // 从队列里取出监听器,依次执行onSuccess回调
        while (true) {
            RequestFutureListener<T> listener = listeners.poll();
            if (listener == null)
                break;
            listener.onSuccess(value);
        }
    }

    private void fireFailure() {
        // 获取异常,
        RuntimeException exception = exception();
        // 从队列里取出监听器,依次执行onFailure回调
        while (true) {
            RequestFutureListener<T> listener = listeners.poll();
            if (listener == null)
                break;
            listener.onFailure(exception);
        }
    }
}

设置结果

RequestFuture提供了两个方法设置结果,一个是成功完成,调用complete方法设置结果。另一个是执行失败,调用raise方法设置异常。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void complete(T value) {
    try {
        // value必须为值,不能为异常
        if (value instanceof RuntimeException)
            throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
        // 检查result的值是否为INCOMPLETE_SENTINEL,并且设置为value
        // 否则认为result的值已经被设置过了
        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
        // 执行监听器
        fireSuccess();
    } finally {
        // 调用completedLatch的countDown方法,通知等待线程
        completedLatch.countDown();
    }
}

public void raise(RuntimeException e) {
    try {
        // 异常不能为空
        if (e == null)
            throw new IllegalArgumentException("The exception passed to raise must not be null");
        // 检查result的值是否为INCOMPLETE_SENTINEL,并且设置为异常
        if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
        // 执行监听器
        fireFailure();
    } finally {
        // 调用completedLatch的countDown方法,通知等待线程
        completedLatch.countDown();
    }
}

装饰 RequestFuture

RequestFuture有两个方法比较特殊,

chain方法,负责连接一个RequestFuture。当父RequestFuture完成时,子RequestFuture也同时完成。这里需要注意到两个RequestFuture的泛型必须相同。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public void chain(final RequestFuture<T> future) {
    // 添加监听器来实现
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            // 当父RequestFuture完成时,会调用这个监听器的onSuccess方法
            // 这个监听器执行时,会调用future的complete方法设置结果
            future.complete(value);
        }

        @Override
        public void onFailure(RuntimeException e) {
            // 当父RequestFuture完成时,会调用这个监听器的onFailure方法
            // 这个监听器执行时,会调用future的raise方法设置结果
            future.raise(e);
        }
    });
}

compose方法,负责转换RequestFuture的泛型。比如一个RequestFuture 类型,表示它的结果是String类型的。现在需要将结果转换为Integer类型,返回RequestFuture ,那么就需要compose方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// T 表示源类型,S表示目标类型
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
    // 这里生成S类型的RequestFuture
    final RequestFuture<S> adapted = new RequestFuture<>();
    // 添加监听器
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            // 当父RequestFuture完成时,会调用这个监听器的onSuccess方法
            // 这个监听器调用了RequestFutureAdapter的onSucceess方法,
            // RequestFutureAdapter这里需要设置adapted的结果
            adapter.onSuccess(value, adapted);
        }

        @Override
        public void onFailure(RuntimeException e) {
            adapter.onFailure(e, adapted);
        }
    });
    return adapted;
}

这里注意下传递的RequestFutureAdapter参数。RequestFutureAdapter它是一个接口,用户必须实现它的onSuccess和onFailure方法。

onSuccess方法,必须根据value参数,生成结果,并且赋予给S类型的RequestFuture。

onFailure方法,必须根据exception参数,生成结果,并且赋予给S类型的RequestFuture。

构造请求

ConsumerNetworkClient发送请求,本质上是通过NetworkClient发送。注意到以前讲述NetworkClient的原理,它发送请求之前,是需要先构造ClientRequest的。

ConsumerNetworkClient在构建请求时,传递了包含回调函数的类。回调类由RequestFutureCompletionHandler表示,它实现了RequestCompletionHandler接口。

这里着重看下RequestFutureCompletionHandler的onComplete函数,当请求响应完成后,会调用onComplete方法。这里只是将自身保存在一个队列里,等待之后的统一调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private class RequestFutureCompletionHandler implements RequestCompletionHandler {
    private ClientResponse response;
    
    @Override
    public void onComplete(ClientResponse response) {
        // 更新响应数据
        this.response = response;
        // 将自身添加到pendingCompletion队列里
        pendingCompletion.add(this);
    }
}

ConsumerNetworkClient的send方法,负责构建请求,然后将请求保存到UnsentRequests集合里。UnsentRequests为每个节点都保存了一个请求队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ConsumerNetworkClient implements Closeable {
    // NetworkClient 实例
    private final KafkaClient client;
    // ClientRequest集合,它为每个节点保存了一个请求队列
    private final UnsentRequests unsent = new UnsentRequests();

    public RequestFuture<ClientResponse> send(Node node,
                                              AbstractRequest.Builder<?> requestBuilder,
                                              int requestTimeoutMs) {
        long now = time.milliseconds();
        // 实例化RequestFutureCompletionHandler,里面实例化了RequestFuture
        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
        // 构建ClientRequest,需要注意的是completionHandler,它作为回调函数
        ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
                requestTimeoutMs, completionHandler);
        // 将请求保存到unsent集合里
        unsent.put(node, clientRequest);

        // 如果NetworkClient阻塞了,通知它
        client.wakeup();
        // 返回 RequestFuture
        return completionHandler.future;
    }
}

处理响应

send方法只是将请求保存到了队列里,并没有发送。ConsumerNetworkClient还提供了poll方法,可以接收RequestFuture参数,等待请求完成。ConsumerNetworkClient放入poll方法,不仅仅负责发送请求,还包括处理响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ConsumerNetworkClient implements Closeable {
    
     public void poll(RequestFuture<?> future) {
         // 循环执行poll方法,直到future完成
        while (!future.isDone())
            poll(Long.MAX_VALUE, time.milliseconds(), future);
    }
    
    public void poll(long timeout, long now, PollCondition pollCondition) {
        poll(timeout, now, pollCondition, false);
    }
    
    public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {
        // 执行完成的回调函数
        firePendingCompletedRequests();
        lock.lock();
        try {
            handlePendingDisconnects();
            // 发送请求
            long pollDelayMs = trySend(now);
            timeout = Math.min(timeout, pollDelayMs);
            if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
                if (client.inFlightRequestCount() == 0)
                    timeout = Math.min(timeout, retryBackoffMs);
                client.poll(Math.min(maxPollTimeoutMs, timeout), now);
                now = time.milliseconds();
            } else {
                client.poll(0, now);
            }
            checkDisconnects(now);
            if (!disableWakeup) {
                maybeTriggerWakeup();
            }
            maybeThrowInterruptException();
            // 发送请求
            trySend(now);

            // 处理过期的请求
            failExpiredRequests(now);

            // 清空unset集合,因为请求都已经发送出去了
            unsent.clean();
        } finally {
            lock.unlock();0.
        }
        // 执行完成的回调函数
        firePendingCompletedRequests();
    }
}

trySend方法负责发送请求。它会遍历请求集合unsent,依次发送。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private long trySend(long now) {
    long pollDelayMs = Long.MAX_VALUE;
    // 遍历unsent请求
    for (Node node : unsent.nodes()) {
        // 提取该节点的请求
        Iterator<ClientRequest> iterator = unsent.requestIterator(node);
        if (iterator.hasNext())
            pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
        
        while (iterator.hasNext()) {
            // 遍历该节点的请求
            ClientRequest request = iterator.next();
            // 调用ready创建连接
            if (client.ready(node, now)) {
                // 调用NetworkClient的send方法发送请求
                client.send(request, now);
                // 移除掉已发送的请求
                iterator.remove();
            }
        }
    }
    return pollDelayMs;
}

firePendingCompletedRequests方法负责处理响应。我们回想下,在构建请求时,指定了回调类。回调函数执行时,会将自身添加到pendingCompletion队列里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class ConsumerNetworkClient implements Closeable {
    // 请求完成的回调队列
    private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
    
    private void firePendingCompletedRequests() {
        boolean completedRequestsFired = false;
        for (;;) {
            // 循环从队列中,提取completionHandler
            RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
            if (completionHandler == null)
                break;
            // 执行回调函数
            completionHandler.fireCompletion();
        }
}

最后看看RequestFutureCompletionHandler的fireCompletion方法,它会根据响应结果,来设置RequestFuture的结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private class RequestFutureCompletionHandler implements RequestCompletionHandler {
    
    private final RequestFuture<ClientResponse> future;
    // 响应数据
    private ClientResponse response;
    // 如果请求失败,这里会设置异常值
    private RuntimeException e;
    
    public void fireCompletion() {
        if (e != null) {
            // 如果请求出现异常,则调用raise方法设置异常,完成future
            future.raise(e);
        } else if (response.versionMismatch() != null) {
            // 响应出现版本不支持错误
            future.raise(response.versionMismatch());
        } else if (....) {
            // 检查其他错误
            .....
        } else {
            // 调用complete方法,设置future的结果
            future.complete(response);
        }
    }
}
updatedupdated2023-07-022023-07-02