DAGScheduler是面向Stage的任务调度器,负责接收Spark应用提交的Job,并根据RDD的依赖关系划分Stage,并提交Stage给TaskScheduler调度器。下面我们继续看DAG-Scheduler的源码实现。
(1)在DAGScheduler类内部会进行一系列的方法调用,首先是在runJob方法里,调用submitJob方法来继续提交作业,这里会发生阻塞,直到返回作业完成或失败的结果。
(2)在submitJob方法里,最重要的是创建了一个JobWaiter对象,并借助AKKA通信把这个对象发送给DAGScheduler的内嵌类eventProcessActor进行处理。
(3)由eventProcessActor的初始化,我们知道这个类实际上指的是DAGSchedulerEvent-ProcessActor。
(4)在DAGSchedulerEventProcessActor的消息接受方法receive方法中,对接受到JobSub-mitted样例类完成模式匹配后,继续调用DAGScheduler的handleJobSubmitted方法来提交作业。
(5)handleJobSubmitted方法是个很关键的方法,在其内部会根据finalRDD构建一个Stage,这也意味着开始了Stage的划分,最后调用submitStage方法来提交这个Stage。(www.xing528.com)
(6)submitStage方法中会根据依赖关系划分Stage,通过递归调用从finalStage一直往前找它的父Stage,直到Stage没有父Stage时就调用submitMissingTasks方法提交给Stage。这样就完成了将job划分为一个或者多个Stage。如果被当作参数传进的Stage有父Stage,那么这个Stage会放入waitingStage中,等待以后执行。
(7)getMissingParentStages方法中会根据finalstage对应finalRDD的dependence类型来创建它的父Stage。如果是ShuffleDependency(也即宽依赖),则调用getShuffleMapStage方法创建父Stage;如果是窄依赖,会把窄依赖对应的RDD放入waitingForVisit栈顶,等下次继续调用visit方法继续向前回溯。
(8)最后会在submitMissingTasks方法中将Stage封装成TaskSet通过taskSchedul-er.submitTasks函数提交给TaskScheduler处理。
此时TaskScheduler通过调用自己的submitTasks方法接受来自DAGScheduler发送过来的TaskSet,TaskScheduler收到TaskSet后负责把任务集以Task的形式一个个分发到集群Worker结点的Executor中去运行。下面我们进入TaskScheduler类的Spark源码,分析它如何提交Task到集群中去运行。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。