publicclassRequestHeaderextendsAbstractRequestResponse{privatestaticfinalStringAPI_KEY_FIELD_NAME="api_key";privatestaticfinalStringAPI_VERSION_FIELD_NAME="api_version";privatestaticfinalStringCLIENT_ID_FIELD_NAME="client_id";privatestaticfinalStringCORRELATION_ID_FIELD_NAME="correlation_id";publicstaticfinalSchemaSCHEMA=newSchema(newField(API_KEY_FIELD_NAME,INT16,"The id of the request type."),newField(API_VERSION_FIELD_NAME,INT16,"The version of the API."),newField(CORRELATION_ID_FIELD_NAME,INT32,"A user-supplied integer value that will be passed back with the response"),newField(CLIENT_ID_FIELD_NAME,NULLABLE_STRING,"A user specified identifier for the client making the request.",""));}
publicclassProduceRequestextendsAbstractRequest{privatestaticfinalStringACKS_KEY_NAME="acks";privatestaticfinalStringTIMEOUT_KEY_NAME="timeout";privatestaticfinalStringTOPIC_DATA_KEY_NAME="topic_data";// topic level field names
privatestaticfinalStringPARTITION_DATA_KEY_NAME="data";// partition level field names
privatestaticfinalStringRECORD_SET_KEY_NAME="record_set";// 每个topic的数据格式
privatestaticfinalSchemaTOPIC_PRODUCE_DATA_V0=newSchema(TOPIC_NAME,newField(PARTITION_DATA_KEY_NAME,newArrayOf(newSchema(PARTITION_ID,newField(RECORD_SET_KEY_NAME,RECORDS)))));// 最新版的请求格式
privatestaticfinalSchemaPRODUCE_REQUEST_V3=newSchema(CommonFields.NULLABLE_TRANSACTIONAL_ID,newField(ACKS_KEY_NAME,INT16,"..."),newField(TIMEOUT_KEY_NAME,INT32,"The time to await a response in ms."),// TOPIC_PRODUCE_DATA_V0数组类型
newField(TOPIC_DATA_KEY_NAME,newArrayOf(TOPIC_PRODUCE_DATA_V0)));}
@OverridepublicStructtoStruct(){// Store it in a local variable to protect against concurrent updates
Map<TopicPartition,MemoryRecords>partitionRecords=partitionRecordsOrFail();shortversion=version();// requestSchema方法会根据版本,返回对应的Schema
Structstruct=newStruct(ApiKeys.PRODUCE.requestSchema(version));Map<String,Map<Integer,MemoryRecords>>recordsByTopic=CollectionUtils.groupDataByTopic(partitionRecords);// 设置对应的属性
struct.set(ACKS_KEY_NAME,acks);struct.set(TIMEOUT_KEY_NAME,timeout);struct.setIfExists(NULLABLE_TRANSACTIONAL_ID,transactionalId);List<Struct>topicDatas=newArrayList<>(recordsByTopic.size());for(Map.Entry<String,Map<Integer,MemoryRecords>>topicEntry:recordsByTopic.entrySet()){// 因为TOPIC_DATA_KEY_NAME字段是Schema类型的,这里调用了instance方法实例化Struct
StructtopicData=struct.instance(TOPIC_DATA_KEY_NAME);topicData.set(TOPIC_NAME,topicEntry.getKey());List<Struct>partitionArray=newArrayList<>();for(Map.Entry<Integer,MemoryRecords>partitionEntry:topicEntry.getValue().entrySet()){MemoryRecordsrecords=partitionEntry.getValue();// 因为PARTITION_DATA_KEY_NAME字段是Schema类型,这里调用了instance实例化Struct
Structpart=topicData.instance(PARTITION_DATA_KEY_NAME).set(PARTITION_ID,partitionEntry.getKey()).set(RECORD_SET_KEY_NAME,records);// 添加partition数据
partitionArray.add(part);}// 设置PARTITION_DATA_KEY_NAME字段
topicData.set(PARTITION_DATA_KEY_NAME,partitionArray.toArray());topicDatas.add(topicData);}// 设置topic字段的数据
struct.set(TOPIC_DATA_KEY_NAME,topicDatas.toArray());returnstruct;}
publicclassResponseHeaderextendsAbstractRequestResponse{publicstaticfinalSchemaSCHEMA=newSchema(newField("correlation_id",INT32,"The user-supplied value passed in with the request"));}