1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]) {
// 解析参数
var argv = args.toList
while (!argv.isEmpty) {
.....
}
// 调用 run 函数
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
// 以hadoop所使用的用户执行程序
SparkHadoopUtil.get.runAsSparkUser { () =>
Utils.checkHost(hostname)
// 实例化executor的默认spark配置
val executorConf = new SparkConf
// 连接driver的客户端使用的端口号
val port = executorConf.getInt("spark.executor.port", 0)
// 实例化客户模式的RpcEnv
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
port,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
// 创建连接driver服务的客户端,
// 这里的driver是指 CoarseGrainedSchedulerBackend类的 DriverEndpoint服务
val driver = fetcher.setupEndpointRefByURI(driverUrl)
// 向driver请求spark配置
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
// 获取完配置后,关闭客户端
fetcher.shutdown()
// 根据driver获取的配置,生成executor的配置
val driverConf = new SparkConf()
for ((key, value) <- props) {
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
}
// 创建Executor的SparkEnv
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
// 注册并运行CoarseGrainedExecutorBackend Rpc服务
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
// 等待 Rpc服务运行结束
env.rpcEnv.awaitTermination()
}
}
}
|