Spark应用程序由driver program和在Executor内执行的代码两部分组成,Spark的作业调度主要说的就是这些基于RDD的一系列操作算子构成一个Job,然后在Executor中执行。这些操作算子主要分为transformation算子和action算子,对于transform算子的计算是lazy级别的,也就是延迟执行,只有出现了action算子才触发Job的提交。这里我们以最经典的wordcount(单词计数)为例并结合Spark 1.2的源代码来分析一下作业的提交(下面两行代码可以看作是wordcount的伪代码,首先是从HDFS文件系统中加载README.md文件生成一个MappedRDD,然后使用flatMap、map、reduceByKey一系列Transformation操作,最后调用RDD的count方法完成README.md文件的单词计数)。
(1)这个Job的真正执行是从RDD的count方法这个action算子出现开始的。我们打开RDD这个抽象类的源码,由RDD的count方法触发了SparkContext的runJob方法来提交作业。我们可以看到对于Spark作业的提交是在其内部隐性调用runJob方法进行的,对于用户来说,不用显性地去提交作业。
(2)对于RDD来说,它们会根据彼此之间的依赖关系形成一个有向无环图(DAG),然后就是把这个DAG图交给DAGScheduler来处理,从源代码的层面来看,SparkContext的runJob方法经过几次回调后会调用DAGScheduler的runJob方法,这时候作业提交进入了DAGScheduler的处理阶段。(www.xing528.com)
(3)在Spark程序中,可以存在多个Job,而且这些Job之间可以没有任何依赖关系,对于多个Job之间的调度,Spark目前提供了两种调度策略:一种是FIFO(先进先出)模式,这也是目前默认的模式;另一种是FAIR模式,FAIR模式的调度可以通过两个参数的配置来决定Job执行的优先模式,两个参数分别是minShare(最小任务数)、weight(任务的权重)。如果你的Spark应用是通过服务的形式,为多个用户提交作业的话,那么可以通过配置Fair模式相关参数来调整不同用户作业的调度和资源分配优先级。在TaskSchedulerImpl的initialize方法中先实现了对rootPool根调度池的初始化,随后跟进SchedulableMode的匹配方式建立了SchedulableBuilder对象,具体的调度是由SchedulableBuilder的buildPools方法来实现的。
在以上的SparkContext的runJob方法里,调用了DAGScheduler的runJob方法,下面我们进入DAGScheduler类来查看它的runJob方法的具体实现。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。