Spark Executor 节点运行原理

Spark Executor 节点运行原理

Executor运行流程图

spark-executor-communicate

Executor 节点启动

这里讲的spark运行的场景都是在Yarn上。从这边博客 {% post_link spark-on-yarn %} ,可以看到Executor节点的启动函数,是CoarseGrainedExecutorBackend的main函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
object CoarseGrainedExecutorBackend extends Logging {
  
  def main(args: Array[String]) {
    // 解析参数
    var argv = args.toList
    while (!argv.isEmpty) {
      .....
    }
    
    //   调用 run 函数
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }
  
  def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {
      // 以hadoop所使用的用户执行程序 
      SparkHadoopUtil.get.runAsSparkUser { () =>
        Utils.checkHost(hostname)

        // 实例化executor的默认spark配置
        val executorConf = new SparkConf
        // 连接driver的客户端使用的端口号
        val port = executorConf.getInt("spark.executor.port", 0)
        // 实例化客户模式的RpcEnv
        val fetcher = RpcEnv.create(
          "driverPropsFetcher",
          hostname,
          port,
          executorConf,
          new SecurityManager(executorConf),
          clientMode = true)
        // 创建连接driver服务的客户端,
        // 这里的driver是指 CoarseGrainedSchedulerBackend类的 DriverEndpoint服务
        val driver = fetcher.setupEndpointRefByURI(driverUrl)
        // 向driver请求spark配置
        val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
        val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
        // 获取完配置后,关闭客户端
        fetcher.shutdown()

        // 根据driver获取的配置,生成executor的配置
        val driverConf = new SparkConf()
        for ((key, value) <- props) {
          if (SparkConf.isExecutorStartupConf(key)) {
            driverConf.setIfMissing(key, value)
          } else {
            driverConf.set(key, value)
          }
        } 

        // 创建Executor的SparkEnv
        val env = SparkEnv.createExecutorEnv(
          driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

        // 注册并运行CoarseGrainedExecutorBackend Rpc服务
        env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
          env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
        // 等待 Rpc服务运行结束
        env.rpcEnv.awaitTermination()
      }
  }
} 

CoarseGrainedExecutorBackend 服务

CoarseGrainedExecutorBackend继承ThreadSafeRpcEndpoint,包装了Executor类,实现对外提供Rpc服务,支持下列接口:

  • RegisteredExecutor, 注册executor
  • LaunchTask, 执行task
  • KillTask, 停止task
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private[spark] class CoarseGrainedExecutorBackend
  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
      
  override def receive: PartialFunction[Any, Unit] = {
    // 接收从driver发送来的消息,实例化Executor
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

    // 接收从driver发来的注册失败的消息
    case RegisterExecutorFailed(message) =>
      // 退出进程
      exitExecutor(1, "Slave registration failed: " + message)

    // 接收从driver发来的task
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        // 反序列化task
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        // 提交task给executor执行
        executor.launchTask(this, taskDesc)
      }
      
    case KillTask(taskId, _, interruptThread, reason) =>
      if (executor == null) {
        exitExecutor(1, "Received KillTask command but executor was null")
      } else {
        // 调用executor杀死任务
        executor.killTask(taskId, interruptThread, reason)
      }

    case StopExecutor =>
      stopping.set(true)
      logInfo("Driver commanded a shutdown")
      // Cannot shutdown here because an ack may need to be sent back to the caller. So send
      // a message to self to actually do the shutdown.
      self.send(Shutdown)

    case Shutdown =>
      stopping.set(true)
      new Thread("CoarseGrainedExecutorBackend-stop-executor") {
        override def run(): Unit = {
          // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
          // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
          // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
          // Therefore, we put this line in a new thread.
          executor.stop()
        }
      }.start()
  }
} 

执行任务

driver会发送Task给CoarseGrainedExecutorBackend 服务。CoarseGrainedExecutorBackend会转交给Executor类执行。从上面的代码可以看到,是调用了Executor类的launchTask方法。

Executor类有一个线程池threadPool,负责执行任务。该线程池使用newCachedThreadPool类型,没有线程数目限制。它会把接收的任务,丢到这个线程池里面执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Executor() {
    
    private val threadPool = {
    	val threadFactory = new ThreadFactoryBuilder()
      		.setDaemon(true)
      		.setNameFormat("Executor task launch worker-%d")
      		.setThreadFactory(new ThreadFactory {
        		override def newThread(r: Runnable): Thread =
          			new UninterruptibleThread(r, "unused")
      		})
      	.build()
    	Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
    }
    
    // 执行任务
    def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
        // 实例化TaskRunner
        val tr = new TaskRunner(context, taskDescription)
        runningTasks.put(taskDescription.taskId, tr)
        // 将TaskRunner丢给线程池执行
        threadPool.execute(tr)
  	}
}

接下来看看TaskRunner的实现,代码简化如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class TaskRunner(
    execBackend: ExecutorBackend,
    private val taskDescription: TaskDescription)
  extends Runnable {
      override def run(): Unit = {
          //  
          
          // 通过ExecutorBackend向driver汇报task已开始运行
          execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
          // 反序列化task
          val ser = env.closureSerializer.newInstance()
          task = ser.deserialize[Task[Any]](
          taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
          
          // 调用Task的run方法
          val value = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem)
          
          // 结果序列化
          val resultSer = env.serializer.newInstance()
          val valueBytes = resultSer.serialize(value)
          
		 // 将结果通过ExecutorBackend,发送给driver
          execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      }
  }
}
          

心跳服务

Executor节点还需要保持与driver的心跳,否则driver会认为Executor节点异常。Executor类有个线程,专门负责与driver的心跳连接,定时发送给心跳信息。每次心跳都会携带任务的运行信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Executor() {
    
    // 后台单线程
	private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
    
    // 构建Heartbeat的rpc客户端
    private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
    
    // 启动心跳定时线程
    private def startDriverHeartbeater(): Unit = {
        val heartbeatTask = new Runnable() {
            override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    	}
        heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
    }
    
    // 向driver汇报心跳
    private def reportHeartBeat(): Unit = {
        // 获取所有task运行的信息
        val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
        for (taskRunner <- runningTasks.values().asScala) {
            if (taskRunner.task != null) {
                accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
            }
        }
        // 构建心跳消息
        val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
        // 向driver发送
        val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
          message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
        
}
updatedupdated2023-07-022023-07-02