下面通过源代码解析来看一下TaskScheduler是如何调度和管理TaskSet的任务。
1.TaskScheduler实例化源代码
TaskScheduler和DAGScheduler都在SparkContext实例化的时候一同实例化。
SparkContext源代码中与TaskScheduler实例化相关的代码如下。
SparkContext的createTaskScheduler方法中与Standalone部署模式相关的代码如下。
2.TaskScheduler初始化源代码
TaskScheduler(实际上在实现类TaskSchedulerImpl的initialize方法中)在初始化的过程中设置对SchedulerBackend对象的引用,实例化SchedulerBuilder具体实现类的对象用来创建和管理TaskSetManager池。
TaskSchedulerImpl源代码中的相关代码如下。
3.TaskScheduler启动源代码
TaskScheduler实例对象在DAGScheduler实例化之后启动,并且TaskScheduler启动的过程由TaskSchedulerImpl具体实现。在启动过程中,主要是调用SchedulerBackend的启动方法,然后对不是本地部署模式并且开启任务的推测执行(设置spark.speculation为true)情况,根据配置判断是否周期性地调用TaskSetManager的checkSpeculatableTasks方法检查任务的推测执行。SparkDeploySchedulerBackend的start方法中会最终注册应用程序AppClient。关于SchedulerBackend的更多源代码解析请参考4.6节。(www.xing528.com)
TaskSchedulerImpl源代码中的start方法的相关代码如下。
4.TaskScheduler提交任务源代码
TaskSchedulerImpl启动后,就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理了。对于ShuffleMapStage类型的Stage,DAGScheduler初始化一组ShuffleMapTask实例对象;对于ResultStage类型的Stage,DAGScheduler初始化一组Re-sultTask实例对象。最后,DAGScheduler将这组ResultTask实例对象封装成TaskSet实例对象提交给TaskSchedulerImpl。
注意,ShuffleMapTask是根据Stage所依赖的RDD的partition分布产生跟partition数量相等的Task,这些Task根据partition的本地性分布在不同的集群结点;ResultTask负责输出整个Job的结果。
DAGScheduler的submitMissingTasks方法的部分关键代码如下。
TaskSchedulerImpl在submitTasks中初始化一个TaskSetManager,并通过SchedulerBuilder对其生命周期进行管理,最后调用SchedulerBackend的reviveOffers方法进行TaskSet所需资源的分配。在TaskSet得到足够的资源后,在SchedulerBackend的launchTasks方法中将Task-Set中的Task一个一个地发送到Executor去执行。
TaskSchedulerImpl的submitTasks方法的部分关键代码如下。
接下来继续讲解SchedulerBackend的原理剖析和源代码解析。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。