从TaskSchedulerImpl的submitTasks的方法中可以知道,Spark Standalone部署模式调用SparkDeploySchedulerBackend的reviveOffers方法进行TaskSet所需资源的分配,在得到足够的资源后,将TaskSet中的Task一个一个地发送到Executor去执行。下面来看一下这里的资源即Executor是如何得到和分配的。
SparkDeploySchedulerBackend的reviveOffers方法很简单,就是发送一个ReviveOffers消息给内部类DriverEndpoint,代码如下。
DriverEndpoint的receive方法处理ReviveOffers消息也很简单,就是调用makeOffers方法,receive方法的部分关键代码如下。
DriverEndpoint的makeOffers方法首先过滤出Alive状态的Executor放到activeExecutors HahMap变量中,然后使用id、ExecutorData.ExecutorHost和ExecutorData.freeCores构建代表Executor可用资源的WorkerOffer。最后是两个最重要的方法调用。先是调用TaskScheduler-Impl的resourceOffers得到TaskDescription的二维数组,包含Task ID、Executor ID和Task In-dex等Task执行需要的信息。然后回调DriverEndpoint的launchTask给每个Task对应的Ex-ector发送执行Task的LaunchTask消息(其实是由CourseGrainedExecutorBackend转发LauchTask消息)。DriverEndpoint的makeOffers方法的代码如下。
TaskSchedulerImpl的resourceOffers方法的代码如下。
TaskSchedulerImpl的resourceOffers方法返回二维数组TaskDescription后作为Driver-Endpoint的launchTasks方法的参数。DriverEndpoint的launchTasks方法中首先对传入的tasks进行扁平化操作(例如,将多维数组降维成一维数组),得到所有的Task,然后遍历所有的Task。在遍历过程中,调用serialize()方法对Task进行序列化,得到serialized-Task。判断如果serialiedTask大于等于Akka帧减去Akka预留空间大小,则调用TaskSet-Manager的abort方法终止该任务的执行,否则,将LaunchTask(new SerializableBuffer(se-rializedTask))消息发送到CoarseGrainedExecutorBackend。(www.xing528.com)
DrvierEndpoint的akkaFrameSize定义和launchTasks方法的部分关键代码如下。
Akka通过SparkConfig中配置的spark.akka.frameSize取出Akka帧大小的配置,默认为128 MB,Akka预留空间大小为200 KB。
Akka的maxFrameSizeBytes方法和预留空间代码如下。
此后,CoarseGrainedExecutorBackend匹配到LaunchTask(data)消息后,首先调用dese-rialized方法,反序列化出task,然后调用Executor的lauchTask()方法执行Task的处理。具体详情请参阅第5章执行器(Executor)。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。