publicclassTransportRequestHandlerextendsMessageHandler<RequestMessage>{privatefinalRpcHandlerrpcHandler;@Overridepublicvoidhandle(RequestMessagerequest){if(requestinstanceofChunkFetchRequest){processFetchRequest((ChunkFetchRequest)request);}elseif(requestinstanceofRpcRequest){processRpcRequest((RpcRequest)request);}elseif(requestinstanceofOneWayMessage){processOneWayMessage((OneWayMessage)request);}elseif(requestinstanceofStreamRequest){processStreamRequest((StreamRequest)request);}else{thrownewIllegalArgumentException("Unknown request type: "+request);}}privatevoidprocessRpcRequest(finalRpcRequestreq){try{rpcHandler.receive(reverseClient,req.body().nioByteBuffer(),newRpcResponseCallback(){@OverridepublicvoidonSuccess(ByteBufferresponse){respond(newRpcResponse(req.requestId,newNioManagedBuffer(response)));}@OverridepublicvoidonFailure(Throwablee){respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}});}catch(Exceptione){logger.error("Error while invoking RpcHandler#receive() on RPC id "+req.requestId,e);respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}finally{req.body().release();}}}
private[netty]classDispatcher(nettyEnv:NettyRpcEnv)extendsLogging{// RpcEndpoint集合
privatevalendpoints:ConcurrentMap[String, EndpointData]=newConcurrentHashMap[String, EndpointData]defpostRemoteMessage(message:RequestMessage,callback:RpcResponseCallback):Unit={valrpcCallContext=newRemoteNettyRpcCallContext(nettyEnv,callback,message.senderAddress)// 创建RpcMessage消息,调用postMessage方法发送
valrpcMessage=RpcMessage(message.senderAddress,message.content,rpcCallContext)// 请求消息message,包含了要请求服务的名称
postMessage(message.receiver.name,rpcMessage,(e)=>callback.onFailure(e))}privatedefpostMessage(endpointName:String,message:InboxMessage,callbackIfStopped:(Exception)=>Unit):Unit={valerror=synchronized{// 获取对应的EndpointData
valdata=endpoints.get(endpointName)if(stopped){Some(newRpcEnvStoppedException())}elseif(data==null){Some(newSparkException(s"Could not find $endpointName."))}else{// 发送给对应的inbox
data.inbox.post(message)receivers.offer(data)None}}// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)}}
private[netty]classInbox(valendpointRef:NettyRpcEndpointRef,valendpoint:RpcEndpoint)extendsLogging{defprocess(dispatcher:Dispatcher):Unit={varmessage:InboxMessage=nullinbox.synchronized{// 如果该服务不支持多线程,那么只允许一个线程处理消息
if(!enableConcurrent&&numActiveThreads!=0){return}// 从队列中获取消息
message=messages.poll()if(message!=null){numActiveThreads+=1}else{return}}while(true){safelyCall(endpoint){messagematch{// Rpc消息,需要返回响应
caseRpcMessage(_sender,content,context)=>try{// 这里调用了endpoint的receiveAndReply方法处理消息
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content,{msg=>thrownewSparkException(s"Unsupported message $message from ${_sender}")})}catch{caseNonFatal(e)=>context.sendFailure(e)// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throwe}// 不需要响应的Rpc消息
caseOneWayMessage(_sender,content)=>// 这里调用了endpoint的receive方法处理消息
endpoint.receive.applyOrElse[Any, Unit](content,{msg=>thrownewSparkException(s"Unsupported message $message from ${_sender}")})// 启动消息,在Inbox初始化时会发送
caseOnStart=>// 调用endpoint的OnStart回调函数
endpoint.onStart()}// 退出消息
caseOnStop=>valactiveThreads=inbox.synchronized{inbox.numActiveThreads}assert(activeThreads==1,s"There should be only a single active thread but found $activeThreads threads.")// 从dispatcher注销该服务
dispatcher.removeRpcEndpointRef(endpoint)// 调用endpoint的OnStop回调函数
endpoint.onStop()}}}}
privateclassMessageLoopextendsRunnable{overridedefrun():Unit={try{while(true){try{// 从receivers队列获取EndpointData
valdata=receivers.take()if(data==PoisonPill){// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)return}data.inbox.process(Dispatcher.this)}catch{caseNonFatal(e)=>logError(e.getMessage,e)}}}catch{caseie:InterruptedException=>// exit
}}}
privatevoidrespond(Encodableresult){SocketAddressremoteAddress=channel.remoteAddress();channel.writeAndFlush(result).addListener(future->{if(future.isSuccess()){logger.trace("Sent result {} to client {}",result,remoteAddress);}else{logger.error(String.format("Error sending result %s to %s; closing connection",result,remoteAddress),future.cause());channel.close();}});}