Kafka Schema Registry 还支持修改数据格式,这样对于同一个 topic ,它的消息有多个版本,前面的消息和最新的消息都可能会完全不一样,那么客户怎么区分呢。Registry 会为每种数据格式都会分配一个 id 号,然后发送的每条消息都会附带对应的数据格式 id。
KafkaProducer 在第一次序列化的时候,会自动向 Registry 服务端注册。服务端保存数据格式后,会返回一个 id 号。KafkaProducer发送消息的时候,需要附带这个 id 号。这样 KafkaConsumer 在读取消息的时候,通过这个 id 号,就可以从 Registry 服务端 获取。
publicclassCachedSchemaRegistryClientimplementsSchemaRegistryClient{// Key 为数据格式的名称, 里面的 Value 为 Map类型,它对于的 Key 为数据格式,Value 为对应的 id 号
privatefinalMap<String,Map<Schema,Integer>>schemaCache;// Key 为数据格式的名称,里面的 Value 为 Map类型,它对于的 Key 为 id 号,Value 为对应的数据格式
// 这个集合比较特殊,当 Key 为 null 时,表示 id 到 数据格式的缓存
privatefinalMap<String,Map<Integer,Schema>>idCache;@Overridepublicsynchronizedintregister(Stringsubject,Schemaschema,intversion,intid)throwsIOException,RestClientException{// 从schemaCache查找缓存,如果不存在则初始化空的哈希表
finalMap<Schema,Integer>schemaIdMap=schemaCache.computeIfAbsent(subject,k->newHashMap<>());// 获取对应的 id 号
finalIntegercachedId=schemaIdMap.get(schema);if(cachedId!=null){// 检查 id 号是否有冲突
if(id>=0&&id!=cachedId){thrownewIllegalStateException("Schema already registered with id "+cachedId+" instead of input id "+id);}// 返回缓存的 id 号
returncachedId;}if(schemaIdMap.size()>=identityMapCapacity){thrownewIllegalStateException("Too many schema objects created for "+subject+"!");}// 如果缓存没有,则向服务端发送 http 请求
finalintretrievedId=id>=0?registerAndGetId(subject,schema,version,id):registerAndGetId(subject,schema);// 缓存结果
schemaIdMap.put(schema,retrievedId);idCache.get(null).put(retrievedId,schema);returnretrievedId;}}
publicclassKafkaGroupMasterElectorimplementsMasterElector,SchemaRegistryRebalanceListener{publicvoidinit()throwsSchemaRegistryTimeoutException,SchemaRegistryStoreException{// 心跳线程
executor=Executors.newSingleThreadExecutor();executor.submit(newRunnable(){@Overridepublicvoidrun(){try{while(!stopped.get()){// 循环调用poll方法,处理心跳
coordinator.poll(Integer.MAX_VALUE);}}catch(Throwablet){log.error("Unexpected exception in schema registry group processing thread",t);}}});publicvoidonRevoked(){log.info("Rebalance started");try{// 因为要重新选举,所以将之前的leader清空
schemaRegistry.setMaster(null);}catch(SchemaRegistryExceptione){// This shouldn't be possible with this implementation. The exceptions from setMaster come
// from it calling nextRange in this class, but this implementation doesn't require doing
// any IO, so the errors that can occur in the ZK implementation should not be possible here.
log.error("Error when updating master, we will not be able to forward requests to the master",e);}}// assignment为选举结果
publicvoidonAssigned(SchemaRegistryProtocol.Assignmentassignment,intgeneration){log.info("Finished rebalance with master election result: {}",assignment);try{switch(assignment.error()){caseSchemaRegistryProtocol.Assignment.NO_ERROR:if(assignment.masterIdentity()==null){log.error(...);}// 记录分配结果
schemaRegistry.setMaster(assignment.masterIdentity());joinedLatch.countDown();break;caseSchemaRegistryProtocol.Assignment.DUPLICATE_URLS:thrownewIllegalStateException(...);default:thrownewIllegalStateException(...);}}catch(SchemaRegistryExceptione){......}}}