Spark Rpc 原理介绍

Spark Rpc 原理介绍

Spark的Rpc服务,是整个Spark框架的基石。Spark的很多服务都是基于Rpc框架之上的,它承担了各个服务之间的信息交流。下面是Rpc的各个组件运行的流程图 :

rpc客户端发送消息

RpcEndpointRef : Rpc客户端,通过它可以发送消息

NettyRpcEnv : 整个Rpc的运行环境,RpcEndpointRef是通过NettyRpcEnv才能把消息发送出去

OutBox : 消息发件箱,存储消息队列

Thread Pool : 负责TrasnportClient初始化的线程池。TrasnportClient初始化的时候,会和服务端新建连接

TrasnportClient : Netty客户端,负责与服务端交互

rpc客户端接收消息

TrasnportClient : Netty客户端,负责与服务端交互

EventLoopGroup : Netty客户端的工作线程池,当收到server的消息时,会触发回调函数

RpcMessage : rpc客户端发送的消息,里面包含了回调函数

rpc服务端处理请求

TrasnportServer: Netty服务端,负责接收消息

EventLoopGroup : Netty客户端的工作线程池,当收到client的请求时,会处理请求

Dispatcher : 分发器,将请求分发给对应的Inbox

Inbox : 收件箱,每个RpcEndpoint都有独立的收件箱,存储着请求

RpcEndpoint : Rpc服务端,它定义了处理请求的逻辑

rpc服务端发送响应

RpcEndpoint : Rpc服务端,它定义了处理请求的逻辑

Channel : SocketChannel类,负责传输数据

Netty : 通过Netty发送响应

Client : 请求端

测试demo

rpc服务端

这里定义了HelloEndpoint服务。对于请求SayHi,响应 hello。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.apache.spark.rpc.HelloEndpoint

import org.apache.spark.SparkConf
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import org.apache.spark.SecurityManager


class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
  override def onStart(): Unit = {
    println("start hello endpoint")
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case SayHello(msg) => {
      println(s"receive $msg")
      context.reply(s"hello, $msg")
    }
  }

  override def onStop(): Unit = {
    println("stop hello endpoint")
  }
}

case class SayHello(msg: String)

object HelloEndpoint {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val manager = new SecurityManager(conf)
    // 创建server模式的RpcEnv
    val rpcEnv: RpcEnv = RpcEnv.create("hello-server", "localhost", 5432, conf, manager)
    // 实例化HelloEndpoint
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    // 在RpcEnv注册helloEndpoint
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    // 等待线程rpcEnv运行完
    rpcEnv.awaitTermination()
  }
}

rpc客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.apache.spark.rpc.netty.HelloEndpointRed

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.rpc.HelloEndpoint.SayHello
import org.apache.spark.rpc._

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}


object HelloEndpointRef {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val manager = new SecurityManager(conf)
    // 创建client模式的RpcEnv
    val rpcEnv: RpcEnv = RpcEnv.create("hello-server", "localhost", 5432, conf, manager, true)
     // 创建EndpointRef
    val endpointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 5432), "hello-service")
    val future: Future[String] = endpointRef.ask[String](SayHello("spark-rpc"))
    val s = Await.result(future, Duration.apply("30s"))
    print(s)
  }
}

源码解析

rpc的客户端的具体原理,可以参见此篇博客 {% post_link spark-rpc-client Spark Rpc 客户端原理 %}

rpc的服务端的具体原理,可以参见此篇博客 {% post_link spark-rpc-server Spark Rpc 服务端原理 %}

updatedupdated2023-07-022023-07-02