publicclassKafkaConsumer<K,V>implementsConsumer<K,V>{@Overridepublicvoidassign(Collection<TopicPartition>partitions){acquireAndEnsureOpen();try{// 检查指定的分区不为 null
if(partitions==null){thrownewIllegalArgumentException("Topic partition collection to assign to cannot be null");}elseif(partitions.isEmpty()){//如果 partitions 为空,那么表明是不在消费消息
this.unsubscribe();}else{// 指定的分区涉及到的topic列表
Set<String>topics=newHashSet<>();for(TopicPartitiontp:partitions){Stringtopic=(tp!=null)?tp.topic():null;if(topic==null||topic.trim().isEmpty())thrownewIllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");topics.add(topic);}// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());// 将分区保存到SubscriptionState里
this.subscriptions.assignFromUser(newHashSet<>(partitions));// 更新Metadata获取哪些topic的元数据
metadata.setTopics(topics);}}finally{release();}}}
privatestaticclassTopicPartitionState{privateLongposition;// 消费位置
privateLonghighWatermark;// 高水位
privateLonglogStartOffset;// the log start offset
privateLonglastStableOffset;privatebooleanpaused;// whether this partition has been paused by the user
privateOffsetResetStrategyresetStrategy;// 如何初始化consumer的消费位置
privateLongnextAllowedRetryTimeMs;}
publicclassFetcher<K,V>implementsSubscriptionState.Listener,Closeable{privatefinalAtomicReference<RuntimeException>cachedListOffsetsException=newAtomicReference<>();privatefinalSubscriptionStatesubscriptions;publicvoidresetOffsetsIfNeeded(){// Raise exception from previous offset fetch if there is one
RuntimeExceptionexception=cachedListOffsetsException.getAndSet(null);if(exception!=null)throwexception;// 寻找那些需要初始化位置的分区
Set<TopicPartition>partitions=subscriptions.partitionsNeedingReset(time.milliseconds());if(partitions.isEmpty())return;finalMap<TopicPartition,Long>offsetResetTimestamps=newHashMap<>();for(finalTopicPartitionpartition:partitions){// 根据策略类型,获取timestamp字段的值
Longtimestamp=offsetResetStrategyTimestamp(partition);if(timestamp!=null)offsetResetTimestamps.put(partition,timestamp);}// 发送请求,并且处理响应
resetOffsetsAsync(offsetResetTimestamps);}privateLongoffsetResetStrategyTimestamp(finalTopicPartitionpartition){// 获取分区的初始化策略
OffsetResetStrategystrategy=subscriptions.resetStrategy(partition);if(strategy==OffsetResetStrategy.EARLIEST)// 如果是EARLIEST策略,则返回 -1
returnListOffsetRequest.EARLIEST_TIMESTAMP;elseif(strategy==OffsetResetStrategy.LATEST)// 如果是LATEST策略,则返回 -2
returnListOffsetRequest.LATEST_TIMESTAMP;elsereturnnull;}privatevoidresetOffsetsAsync(Map<TopicPartition,Long>partitionResetTimestamps){// Add the topics to the metadata to do a single metadata fetch.
for(TopicPartitiontp:partitionResetTimestamps.keySet())metadata.add(tp.topic());// 将这些分区的请求,按照节点进行划分
Map<Node,Map<TopicPartition,Long>>timestampsToSearchByNode=groupListOffsetRequests(partitionResetTimestamps);for(Map.Entry<Node,Map<TopicPartition,Long>>entry:timestampsToSearchByNode.entrySet()){Nodenode=entry.getKey();finalMap<TopicPartition,Long>resetTimestamps=entry.getValue();subscriptions.setResetPending(resetTimestamps.keySet(),time.milliseconds()+requestTimeoutMs);// 发送请求
RequestFuture<ListOffsetResult>future=sendListOffsetRequest(node,resetTimestamps,false);// 添加回调函数
future.addListener(newRequestFutureListener<ListOffsetResult>(){@OverridepublicvoidonSuccess(ListOffsetResultresult){if(!result.partitionsToRetry.isEmpty()){subscriptions.resetFailed(result.partitionsToRetry,time.milliseconds()+retryBackoffMs);metadata.requestUpdate();}// 遍历分区结果
for(Map.Entry<TopicPartition,OffsetData>fetchedOffset:result.fetchedOffsets.entrySet()){TopicPartitionpartition=fetchedOffset.getKey();// 获取位置
OffsetDataoffsetData=fetchedOffset.getValue();LongrequestedResetTimestamp=resetTimestamps.get(partition);// 设置分区的消费位置
resetOffsetIfNeeded(partition,requestedResetTimestamp,offsetData);}});}}privatevoidresetOffsetIfNeeded(TopicPartitionpartition,LongrequestedResetTimestamp,OffsetDataoffsetData){if(!subscriptions.isAssigned(partition)){log.debug("Skipping reset of partition {} since it is no longer assigned",partition);}elseif(!subscriptions.isOffsetResetNeeded(partition)){log.debug("Skipping reset of partition {} since reset is no longer needed",partition);}elseif(!requestedResetTimestamp.equals(offsetResetStrategyTimestamp(partition))){log.debug("Skipping reset of partition {} since an alternative reset has been requested",partition);}else{log.info("Resetting offset for partition {} to offset {}.",partition,offsetData.offset);// 调用seek方法,设置分区的消费位置
subscriptions.seek(partition,offsetData.offset);}}}
publicclassKafkaConsumer<K,V>implementsConsumer<K,V>{privatefinalSubscriptionStatesubscriptions;privatefinalConsumerCoordinatorcoordinator;@OverridepublicvoidcommitAsync(){commitAsync(null);}@OverridepublicvoidcommitAsync(OffsetCommitCallbackcallback){acquireAndEnsureOpen();try{// 从SubscriptionState获取分区的消费位置,然后提交
commitAsync(subscriptions.allConsumed(),callback);}finally{release();}}@OverridepublicvoidcommitAsync(finalMap<TopicPartition,OffsetAndMetadata>offsets,OffsetCommitCallbackcallback){acquireAndEnsureOpen();try{// 调用ConsumerCoordinator的commitOffsetsAsync,向服务端发送提交请求
coordinator.commitOffsetsAsync(newHashMap<>(offsets),callback);}finally{release();}}@OverridepublicvoidcommitSync(Durationtimeout){acquireAndEnsureOpen();try{// 调用ConsumerCoordinator的commitOffsetsAsync,向服务端发送提交请求
if(!coordinator.commitOffsetsSync(subscriptions.allConsumed(),timeout.toMillis())){thrownewTimeoutException("Timeout of "+timeout.toMillis()+"ms expired before successfully "+"committing the current consumed offsets");}}finally{release();}}}