YuMo's Blog

Spark学习笔记

基本概念

RDD - resillient distributed dataset 弹性分布式数据集
Operation - 作用于RDD的各种操作分为transformation和action
Job - 作业,一个JOB包含多个RDD及作用于相应RDD上的各种operation
Stage - 一个作业分为多个阶段

Partition - 数据分区, 一个RDD中的数据可以分成多个不同的区
DAG - Directed Acycle graph, 有向无环图,反应RDD之间的依赖关系
Narrow dependency - 窄依赖,子RDD依赖于父RDD中固定的data partition
Wide Dependency - 宽依赖,子RDD对父RDD中的所有data partition都有依赖
Caching Managenment – 缓存管理,对RDD的中间计算结果进行缓存管理以加快整体的处理速度

RDD

RDD允许开发人员在大型集群上执行基于内存的计算。RDD提供高度受限的共享内存--RDD是只读的。

RDD之间依赖关系

窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区,childRDD只依赖于parentRDD(s)固定数量的partition(即与数据规模无关);
宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区,childRDD的每一个partition都依赖于parentRDD(s)所有partition。例如,map产生窄依赖,而groupByKey则是宽依赖

区别:
窄依赖关键词: pipeline, 宽依赖关键词:shuffle.
窄依赖: 逐个执行map, 然后filter.
宽依赖: 必须先计算好所有父分区数据,然后节点间shuffle.
各自优点:
窄: 失效节点恢复极快--只需重算丢失RDD分区的副分区
宽: 单节点失效可能导致该RDD所有祖先丢失分区, 需整体重新计算。

RDD transforms

Spark 中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们 只是记住应用到基础数据集(例如一个文件 )上的这些转换动作 。只有当发生一个要求返回 结果给 Driver 的动作时,这些转换才会真正运行。这个设计让 Spark 更加有效率的运行。例 如,我们可以实现:通过 map 创建的一个新数据集,并在 reduce 中使用,最终只返回 reduce 的结果给 driver,而不是整个大的新数据集。
默认情况下,每一个转换过的 RDD都会在你在它之上执行一个动作时被重新计算。不 过,你也可以使用 persist(或者 cache)方法,持久化一个 RDD 在内存中。在这种情况下,Spark 将会在集群中,保存相关元素,下次你查询这个 RDD 时,它将能更快速访问。在磁盘上持 久化数据集,或在集群间复制数据集也是支持的,这些选项将在本文档的下一节进行描述 。

RDD持久化

缓存是用 Spark 构建迭代算法的关键。持久化一个 RDD,每一个结点都将把它的计算分块结果保存在内存中 ,并在对此数据集(或 者衍生出的数据集 )进行的其它动作中重用 。这将使得后续的动作 (Actions)变得更加迅速(通 常快10倍)。
persist()或 cache()方法来标记一个要被持久化的 RDD,然后一旦首次被一个动作 (Action)触发计算,它将会被保留在计算结点的内存中并重用。

1
2
3
4
5
6
7
8
9
10
11
def persist(newLevel: StorageLevel): this.type = {
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

RDD源码:

  1. RDD是一个弹性分布式数据集。RDD是一个不可变的,可分区的能被并行操作的数据集
  2. [org.apache.spark.rdd]提供基本操作包括:map, filter, and persist
    [org.apache.spark.rdd.PairRDDFunctions]提供k-v对操作: such as groupByKey and join
    [org.apache.spark.rdd.DoubleRDDFunctions]提供Doubles operations
    [org.apache.spark.rdd.SequenceFileRDDFunctions]提供可被存储为SequenceFiles的操作
    所有的operations均可用类似 RDD[(Int, Int)这样的方式调用
  3. 一个RDD通常由五个主要特性来描述:一个分区list, 一个计算split的函数,一个关于其他RDDs的依赖表,
    一个可选的K-V RDDs的分区,一个可选的位置表(计算每个split,如hdfs中block locations)
  4. Spark中所有的调度和执行均基于以上这些RDD的操作方法,每个RDD均可直接拿来用,此外用户也编写符合自己要求的RDDs并覆盖默认的操作方法
  5. Transformations和Actions
    Transformations: return a new RDD,
    such as: map、flatMap、filter, distinct, union, sortBy, ++, intersection, groupBy, pipe, mapPartitions, zip
    Actions: launch a job to return a value to the user program,
    such as:foreach,subtract,reduce,fold,aggregate,count,take,top,max, min,saveAsTextFile,saveAsObjectFile,keyBy[K]
    Other internal methods and fields: toDebugString, toJavaRDD
    6, 一个RDD子类必须实现以下几个方法: compute, getPartitions, getDependencies, getPreferredLocations
  6. 所有RDD都能使用的方法: sc, setName, persist, cache, dependencies, partitions, preferredLocations, getNarrowAncestors
  7. 只有RDD上有Action时,所有RDD(包含父RDD)上的operations才会被提交到cluster上执行。

clean

1
2
3
4
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

Q: why sc.clean(f)?
A: sc.clean(f)会调ClosureCleaner.clean(f, checkSerializable)
ClosureCleaner存在的意义:Scala bug:[https://issues.scala-lang.org/browse/SI-1419]
此SI-1419会导致Spark运行中消耗无谓的网络带宽,具体原因见Cuora上的回答:
“When Scala constructs a closure, it determines which outer variables the closure will use and stores references to them in the closure object.
This allows the closure to work properly even when it’s called from a different scope than it was created in. Scala sometimes errs on the side
of capturing too many outer variables (see SI-1419). That’s harmless in most cases, because the extra captured variables simply don’t get used
(though this prevents them from getting GC’d). But it poses a problem for Spark, which has to send closures across the network so they can be
run on slaves. When a closure contains unnecessary references, it wastes network bandwidth. More importantly, some of the references may point
to non-serializable objects, and Spark will fail to serialize the closure.To work around this bug in Scala, the ClosureCleaner traverses the
object at runtime and prunes the unnecessary references. Since it does this at runtime, it can be more accurate than the Scala compiler can.
Spark can then safely serialize the cleaned closure.”

RDD与stage的划分

spark将shuffle操作定义为stage的边界,有shuffle操作的stage叫做ShuffleMapStage,对应的task叫做ShuffleMapTask,其他的就是ResultTask。
最终每一个job都为分解为多个stage,并且这些stage形成DAG的依赖关系

编程组件

runtime view

static view称为dataset view,而dynamic view称为parition view. 关系如图所示
在Spark中的task可以对应于线程,worker是一个个的进程,worker由driver来进行管理。

deployment view 运行时view

当有Action作用于某RDD时,该action会作为一个job被提交。
在提交的过程中,DAGScheduler模块介入运算,计算RDD之间的依赖关系。RDD之间的依赖关系就形成了DAG。
每一个JOB被分为多个stage,划分stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个stage,避免多个stage之间的消息传递开销。
当stage被提交之后,由taskscheduler来根据stage来计算所需要的task,并将task提交到对应的worker.
Spark支持以下几种部署模式1)standalone 2)Mesos 3) yarn. 这些部署模式将作为taskscheduler的初始化入参。

caching 缓存机制

RDD的中间计算结果可以被缓存起来,缓存先选Memory,如果Memory不够的话,将会被写入到磁盘中。
根据LRU(last-recent update)来决定哪先内容继续保存在内存,哪些保存到磁盘。

fault-tolerant 容错性

从最初始的RDD到衍生出来的最后一个RDD,中间要经过一系列的处理。那么如何处理中间环节出现错误的场景呢?
Spark提供的解决方案是只对失效的data partition进行事件重演,而无须对整个数据全集进行事件重演,这样可以大大加快场景恢复的开销。
RDD又是如何知道自己的data partition的number该是多少?如果是hdfs文件,那么hdfs文件的block将会成为一个重要的计算依据。

cluster management 集群管理

task运行在cluster之上,除了spark自身提供的standalone部署模式之外,spark还内在支持yarn和mesos.

Yarn来负责计算资源的调度和监控,根据监控结果来重启失效的task或者是重新distributed task一旦有新的node加入cluster的话。

两大主线

静态view 即 RDD, transformation and action
动态view 即 life of a job, 每一个job又分为多个stage,每一个stage中可以包含多个rdd及其transformation,这些stage又是如何映射成为task被distributed到cluster中

参考资料

Introduction to Spark Internals http://files.meetup.com/3138542/dev-meetup-dec-2012.pptx
Resilient Distributed Datasets: A Fault-tolerant Abstraction for In-Memory Cluster Computing https://www.usenix.org/system/files/.../nsdi12-final138.pdf
Lightning-Fast Cluster Computing with Spark and Shark http://www.meetup.com/TriHUG/events/112474102/

Job提交和运行

部署过程

基本组件如下:

Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。

job 的生成和运行

akka介绍

模块间通信有很多成熟的实现,现在很多成熟的Framework已经早已经让我们摆脱原始的Socket编程了。简单归类,可以归纳为基于消息的传递和基于资源共享的同步机制。
基于消息的传递的机制应用比较广泛的有Message Queue。 Message Queue, 是一种 应用程序 对应用程序的通信方法。 应用程序 通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如 远程过程调用 的技术。排队指的是 应用程序 通过 队列来通信。 队列 的使用除去了接收和发送 应用程序 同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ和 RabbitMQ(AMQP的开源实现,现在由Pivotal维护)。
还有不得不提的是ZeroMQ,一个致力于进入Linux内核的基于Socket的编程框架。 官方的说法: “ZeroMQ是一个简单好用的传输层,像框架一样的一个socket library,它使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是 “成为标准网络协议栈的一部分,之后进入Linux内核”。
Spark在很多模块之间的通信选择是Scala原生支持的akka, 一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。akka有以下5个特性:
易于构建并行和分布式应用 (Simple Concurrency & Distribution): Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。
可靠性(Resilient by Design): 系统具备自愈能力,在本地/远程都有监护。
高性能(High Performance):在单机中每秒可发送50,000,000个消息。内存占用小,1GB内存中可保存2,500,000个actors。
弹性,无中心(Elastic — Decentralized):自适应的负责均衡,路由,分区,配置
可扩展(Extensible):可以使用Akka 扩展包进行扩展。

在Spark中的Client,Master和Worker实际上都是一个actor.

Client,Master和Workerq启动通信详解

主要涉及的类:Client.scala, Master.scala和Worker.scala。这三大模块之间的通信框架如下图。
Standalone模式下存在的角色:
Client:负责提交作业到Master。
Master:接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
Worker:负责 管理本节点的资源,定期向 Master汇报心跳,接收Master的命令,比如启动Driver和Executor。

实际上,Master和Worker要处理的消息要比这多得多,本图只是反映了集群启动和向集群提交运算时候的主要消息处理。

job提交流程

job生成的简单流程如下
首先应用程序创建SparkContext的实例,如实例为sc
利用SparkContext的实例来创建生成RDD
经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD
当action作用于转换之后RDD时,会调用SparkContext的runJob方法
sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处

调用路径大致如下

sc.runJob->dagScheduler.runJob->submitJob

DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor
eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
job到stage的转换,生成finalStage并提交运行,关键是调用submitStage
在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
提交task是调用函数submitMissingTasks来完成
task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend
LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
receiveOffers->executor.launchTask->TaskRunner.run

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
master.scala
private[spark] class Master(
override def preStart()
override def preRestart(reason: Throwable, message: Option[Any])
override def postStop()
override def receiveWithLogging = {
case ElectedLeader => {}
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress){}
case RequestKillDriver(driverId) =>{}
case RequestDriverStatus(driverId) => {}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>{}
case Heartbeat(workerId) => {}
case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {}
case RequestMasterState => {}
// [xialeizhou@gmail.com]
// 1. client前来注册,client.scala中发送消息给master: masterActor ! RequestSubmitDriver(driverDescription)
// 2. 主要是创建driver, 持久化,将此driver加入等待队列,并调度等待中的apps,schedule()只在有
// 一个新app加入或可用资源变化时调度一次
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
// 持久化
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
// 每加入一个新app调度一次
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
sender ! SubmitDriverResponse(true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}")
}
}
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule() {
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
true
}
client.scala
/**
* Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
object Client {
def main(args: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
// [xialeizhou@gmail.com]
// 1. entry point for SparkSubmint's main class: deploy.client
// 2. 注,Akka是一个基于scala的容错,高可伸缩性的Actor兵法编程模型, Actor是对并行编程的高级别抽象,轻量化的异步/非组赛事件驱动
// 3. 下面一行旨在构建akka.tcp://...URL, 举例:akka.tcp://sparkDriver@spark-12:45021
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
// [xialeizhou@gmail.com]
// 1.执行ClientActor的preStart方法和receive方法
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
}
}
/**
* Proxy that relays messages to the driver.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
override def preStart() = {
masterActor = context.actorSelection(
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
masterActor ! RequestSubmitDriver(driverDescription)
case "kill" =>
val driverId = driverArgs.driverId
masterActor ! RequestKillDriver(driverId)
}
}
/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String) {
println(s"... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
case true =>
println(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
println(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception.map { e =>
println(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
}
System.exit(0)
}
}
override def receiveWithLogging = {
case SubmitDriverResponse(success, driverId, message) =>
println(message)
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
case KillDriverResponse(driverId, success, message) =>
println(message)
if (success) pollAndReportStatus(driverId) else System.exit(-1)
case DisassociatedEvent(_, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
}
}

可以看出,通过akka,可以非常简单高效的处理模块间的通信,这可以说是Spark IPC的一大特色。