因为两种情况对于Producer而言,都是没有收到响应,Producer无法确定是哪种情况,所以它必须要重新发送消息,来确保服务端不会漏掉一条消息。但这样服务端有可能会收到重复的消息,所以服务端收到消息后,还要做一次去重操作。只有Producer和服务端的相互配合,才能保证消息不丢失也不重复,达到 Exactly One 的情景。
publicclassKafkaProducer<K,V>implementsProducer<K,V>{privatestaticintconfigureInflightRequests(ProducerConfigconfig,booleanidempotenceEnabled){if(idempotenceEnabled&&5<config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)){thrownewConfigException("Must set "+ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION+" to at most 5"+" to use the idempotent producer.");}returnconfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);}privatestaticshortconfigureAcks(ProducerConfigconfig,booleanidempotenceEnabled,Loggerlog){booleanuserConfiguredAcks=false;shortacks=(short)parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));if(config.originals().containsKey(ProducerConfig.ACKS_CONFIG)){userConfiguredAcks=true;}// 如果开启了幂等性,但是用户没有指定ack,则返回 -1。-1表示包括leader和follower分区都要确认
if(idempotenceEnabled&&!userConfiguredAcks){return-1;}// 如果开启了幂等性,但是用户指定的ack不为 -1,则会抛出异常
if(idempotenceEnabled&&acks!=-1){thrownewConfigException(".....");}returnacks;}}
publicfinalclassRecordAccumulator{publicMap<Integer,List<ProducerBatch>>drain(Clustercluster,Set<Node>nodes,intmaxSize,longnow){if(nodes.isEmpty())returnCollections.emptyMap();Map<Integer,List<ProducerBatch>>batches=newHashMap<>();for(Nodenode:nodes){intsize=0;List<PartitionInfo>parts=cluster.partitionsForNode(node.id());List<ProducerBatch>ready=newArrayList<>();intstart=drainIndex=drainIndex%parts.size();do{PartitionInfopart=parts.get(drainIndex);TopicPartitiontp=newTopicPartition(part.topic(),part.partition());// 当max.in.flight.requests.per.connection配置项为 1 时,Sender发送消息的时候,会暂时关闭此分区的请求发送。当完成响应时,才会开放请求发送。
// 这里的isMute方法,是用来判断次分区的请求是否被关闭
if(!isMuted(tp,now)){Deque<ProducerBatch>deque=getDeque(tp);if(deque!=null){synchronized(deque){ProducerBatchfirst=deque.peekFirst();if(first!=null){booleanbackoff=first.attempts()>0&&first.waitedTimeMs(now)<retryBackoffMs;// Only drain the batch if it is not during backoff period.
if(!backoff){if(size+first.estimatedSizeInBytes()>maxSize&&!ready.isEmpty()){break;}else{ProducerIdAndEpochproducerIdAndEpoch=null;booleanisTransactional=false;if(transactionManager!=null){// 判断是否可以向这个分区发送请求
if(!transactionManager.isSendToPartitionAllowed(tp))break;// 获取producer_id 和 epoch
producerIdAndEpoch=transactionManager.producerIdAndEpoch();if(!producerIdAndEpoch.isValid())// we cannot send the batch until we have refreshed the producer id
break;// 这里判断是否开启了事务
isTransactional=transactionManager.isTransactional();// 如果这个消息batch,已经设置了序列号,并且此分区连接有问题, 那么需要跳过这个消息batch
if(!first.hasSequence()&&transactionManager.hasUnresolvedSequence(first.topicPartition))break;// 查看消息batch是否重试,如果是,则跳过
intfirstInFlightSequence=transactionManager.firstInFlightSequence(first.topicPartition);if(firstInFlightSequence!=RecordBatch.NO_SEQUENCE&&first.hasSequence()&&first.baseSequence()!=firstInFlightSequence)break;}ProducerBatchbatch=deque.pollFirst();// 为新的消息batch,设置对应的字段值
if(producerIdAndEpoch!=null&&!batch.hasSequence()){// 这里调用了transactionManager生成序列号
batch.setProducerState(producerIdAndEpoch,transactionManager.sequenceNumber(batch.topicPartition),isTransactional);// 更新序列号
transactionManager.incrementSequenceNumber(batch.topicPartition,batch.recordCount);transactionManager.addInFlightBatch(batch);}batch.close();size+=batch.records().sizeInBytes();ready.add(batch);batch.drained(now);}}}}}}this.drainIndex=(this.drainIndex+1)%parts.size();}while(start!=drainIndex);batches.put(node.id(),ready);}returnbatches;}}
publicclassTransactionManager{// 为每个分区,维护一个消息序列号
privatefinalMap<TopicPartition,Integer>nextSequence;synchronizedIntegersequenceNumber(TopicPartitiontopicPartition){IntegercurrentSequenceNumber=nextSequence.get(topicPartition);if(currentSequenceNumber==null){// 初始序列号为 0
currentSequenceNumber=0;nextSequence.put(topicPartition,currentSequenceNumber);}returncurrentSequenceNumber;}synchronizedvoidincrementSequenceNumber(TopicPartitiontopicPartition,intincrement){IntegercurrentSequenceNumber=nextSequence.get(topicPartition);if(currentSequenceNumber==null)thrownewIllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");// 更新分区对应的序列号
currentSequenceNumber+=increment;nextSequence.put(topicPartition,currentSequenceNumber);}}
classLog(...){privatedefanalyzeAndValidateProducerState(records:MemoryRecords,isFromClient:Boolean):(mutable.Map[Long, ProducerAppendInfo],List[CompletedTxn],Option[BatchMetadata])={// 添加信息的表,Key值为produce_id,Value为添加信息,它包含了新添加的消息batch
valupdatedProducers=mutable.Map.empty[Long, ProducerAppendInfo]valcompletedTxns=ListBuffer.empty[CompletedTxn]// 遍历消息 batch
for(batch<-records.batches.asScalaifbatch.hasProducerId){// 根据producer_id找到,对应producer发送的最近请求
valmaybeLastEntry=producerStateManager.lastEntry(batch.producerId)// 这里的请求有可能来自客户端,也有可能是从leader分区向follower分区发来的
// if this is a client produce request, there will be up to 5 batches which could have been duplicated.
// If we find a duplicate, we return the metadata of the appended batch to the client.
if(isFromClient){// 检测是否有近期重复的请求,如果有则立马返回
maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach{duplicate=>return(updatedProducers,completedTxns.toList,Some(duplicate))}}// 将消息batch的添加信息,添加到updatedProducers表里
valmaybeCompletedTxn=updateProducers(batch,updatedProducers,isFromClient=isFromClient)maybeCompletedTxn.foreach(completedTxns+=_)}(updatedProducers,completedTxns.toList,None)}privatedefupdateProducers(batch:RecordBatch,producers:mutable.Map[Long, ProducerAppendInfo],isFromClient:Boolean):Option[CompletedTxn]={valproducerId=batch.producerId// 获取该 produce 对应的AppendInfo,如果没有则新建一个
valappendInfo=producers.getOrElseUpdate(producerId,producerStateManager.prepareUpdate(producerId,isFromClient))// 将消息batch添加appendInfo里,添加过程中包含了校检序列号
appendInfo.append(batch)}}
private[log]classProducerAppendInfo(valproducerId:Long,valcurrentEntry:ProducerStateEntry,// 上次添加的消息batch的元数据
valvalidationType:ValidationType){// 校检策略
// 初始化新的ProducerStateEntry,保存到updatedEntry属性
privatevalupdatedEntry=ProducerStateEntry.empty(producerId)updatedEntry.producerEpoch=currentEntry.producerEpochupdatedEntry.coordinatorEpoch=currentEntry.coordinatorEpochupdatedEntry.currentTxnFirstOffset=currentEntry.currentTxnFirstOffsetprivatedefcheckProducerEpoch(producerEpoch:Short):Unit={// 如果该消息batch的produce_epoch比之前的还要下,那么就抛出ProducerFencedException错误
if(producerEpoch<updatedEntry.producerEpoch){thrownewProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer "+s"with a newer epoch. $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (server epoch)")}}privatedefcheckSequence(producerEpoch:Short,appendFirstSeq:Int):Unit={// 因为之前已经检查过了produce_epoch,如果出现了不相等的情况,只能是该消息batch的produce_epoch大
if(producerEpoch!=updatedEntry.producerEpoch){// 如果是新的produce_epoch,那么它发送过来的第一个消息batch的序列号只能从0开始
if(appendFirstSeq!=0){// 如果是旧的produce,那么就抛出OutOfOrderSequenceException异常,表示此消息发送的序列号有问题
// 否则抛出UnknownProducerIdException异常
if(updatedEntry.producerEpoch!=RecordBatch.NO_PRODUCER_EPOCH){thrownewOutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch "+s"(request epoch), $appendFirstSeq (seq. number)")}else{thrownewUnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible "+s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")}}}else{// 获取最后一次添加的消息batch的结束序列号
valcurrentLastSeq=if(!updatedEntry.isEmpty)// updatedEntry不为空,那么表示上个消息batch存在于updatedEntry
updatedEntry.lastSeqelseif(producerEpoch==currentEntry.producerEpoch)// updatedEntry为空,那么表示上个消息batch存在于currentEntry
currentEntry.lastSeqelse// 如果是新建的producer,那么它的序列号为NO_SEQUENCE
RecordBatch.NO_SEQUENCEif(currentLastSeq==RecordBatch.NO_SEQUENCE&&appendFirstSeq!=0){// 如果是新建的producer,那么它的第一条消息batch的序列号必须为0,否则抛出OutOfOrderSequenceException异常
thrownewOutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $appendFirstSeq "+s"(incoming seq. number), but expected 0")}elseif(!inSequence(currentLastSeq,appendFirstSeq)){// 继续检查序列号是否连续,否则抛出OutOfOrderSequenceException异常
thrownewOutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq "+s"(incoming seq. number), $currentLastSeq (current end sequence number)")}}}// 检查序列号的连续性
privatedefinSequence(lastSeq:Int,nextSeq:Int):Boolean={// 这里需要注意下,Int.MaxValue 的下个连续值等于 0
nextSeq==lastSeq+1L||(nextSeq==0&&lastSeq==Int.MaxValue)}}
publicclassTransactionManager{// 存储着分区的下一个消息的序列号
privatefinalMap<TopicPartition,Integer>nextSequence;// 存储着正在发送的消息batch
privatefinalMap<TopicPartition,PriorityQueue<ProducerBatch>>inflightBatchesBySequence;synchronizedvoidadjustSequencesDueToFailedBatch(ProducerBatchbatch){if(!this.nextSequence.containsKey(batch.topicPartition))return;intcurrentSequence=sequenceNumber(batch.topicPartition);currentSequence-=batch.recordCount;if(currentSequence<0)thrownewIllegalStateException("Sequence number for partition "+batch.topicPartition+" is going to become negative : "+currentSequence);// 更新该分区的下个序列号,这样之后的消息就会从这个序列号开始,填补了这个序列号的缺失
setNextSequence(batch.topicPartition,currentSequence);for(ProducerBatchinFlightBatch:inflightBatchesBySequence.get(batch.topicPartition)){if(inFlightBatch.baseSequence()<batch.baseSequence())continue;// 如果有在这条消息batch之后,发送的消息,需要更新它的序列号
// 序列号变为减去batch的消息数目
intnewSequence=inFlightBatch.baseSequence()-batch.recordCount;if(newSequence<0)thrownewIllegalStateException("....");// 更新新的序列号
inFlightBatch.resetProducerState(newProducerIdAndEpoch(inFlightBatch.producerId(),inFlightBatch.producerEpoch()),newSequence,inFlightBatch.isTransactional());}}}