接下来将跟踪Spark中一个常用动作操作count的执行流程,从而理清DAG调度的整个过程。count操作的对应实现在RDD抽象类中。count方法的实现代码如下。
def count():Long=sc.runJob(this,Utils.getIteratorSize_).sum
count方法调用了SparkContext类的runJob方法,runJob方法会返回一个包含每个分区内部数据记录个数的整型数组对象,因此之后只需要调用该数组的sum方法做一次求和运算,即可得到整个RDD内数据记录的个数。
SparkContext内有多个runJob方法的实现,这些方法最终都会调用SparkContext类内的同一个runJob方法,将当前作业提交给DAGScheduler类,这个runJob方法的实现代码如下。
SparkContext.runJob方法首先获取函数的调用位置用于后期日志输出和调试,而后清除func函数的函数闭包以方便函数的序列化处理,调用DAGScheduler.runJob方法,交付作业给DAG调度器,显示作业运行进度,并在作业完成之后,执行建立检查点操作,顺着依赖链,寻找之前被标价成MarkForCheckpoint的RDD,将该检查点标记成CheckpointingInProgress,并创建一个作业把RDD数据写入到指定的目录中,注意这个过程在之前的作业已经完成之后才会执行。
函数闭包可被理解成一个函数,使其能够读取外部函数的内部变量。
传递给DAGScheduler.runJob方法的参数中,有两个需要额外注意的参数:第一个是allowLocal参数,当allowLocal参数赋值为true时,DAG调度器会将一些比较简单的任务直接放在本地运行;第二个是resultHandler,resultHandler是一个回调函数,其定义为“(Index, res)=>results[index]=res”,其中result是在SparkContext.runJob中定义的一个数组。可以看到,DAGScheduler.runJob方法本身并没有返回值的,当一个分区数据计算任务执行完毕之后,只需要调用resultHandler(partitionId,result)方法即可把计算结果返回给SparkContext.runJob,而无须经过复杂的函数返回操作。
接下来便从SparkContext类转移到DAGScheduler类中,DAGScheduler.runJob方法的实现代码如下。
SparkContext.runJob方法首先获取函数的调用位置用于后期日志输出和调试,而后清除func函数的函数闭包以方便函数的序列化处理,调用DAGScheduler.runJob方法,交付作业给DAG调度器,显示作业运行进度,并在作业完成之后,执行建立检查点操作,顺着依赖链,寻找之前被标价成MarkForCheckpoint的RDD,将该检查点标记成CheckpointingInProgress,并创建一个作业把RDD数据写入到指定的目录中,注意这个过程在之前的作业已经完成之后才会执行。
函数闭包可被理解成一个函数,使其能够读取外部函数的内部变量。
传递给DAGScheduler.runJob方法的参数中,有两个需要额外注意的参数:第一个是allowLocal参数,当allowLocal参数赋值为true时,DAG调度器会将一些比较简单的任务直接放在本地运行;第二个是resultHandler,resultHandler是一个回调函数,其定义为“(Index, res)=>results[index]=res”,其中result是在SparkContext.runJob中定义的一个数组。可以看到,DAGScheduler.runJob方法本身并没有返回值的,当一个分区数据计算任务执行完毕之后,只需要调用resultHandler(partitionId,result)方法即可把计算结果返回给SparkContext.runJob,而无须经过复杂的函数返回操作。
接下来便从SparkContext类转移到DAGScheduler类中,DAGScheduler.runJob方法的实现代码如下。(www.xing528.com)
runJob函数继续调用了submitJob方法提交一个作业,submitJob函数会返回一个JobWaiter类的示例waiter,JobWaiter类主要被用于两个用途:一是通过eventProcessActor对象发送一个JobCancelled请求消息来取消一个作业的执行;二是阻塞DAGScheduler.runJob所在进程,并等待提交作业执行完毕,实现代码如下。JobWaiter类内部会监听每一个任务的完成事件,统计任务完成的个数,在每个任务完成之后调用回调函数resultHandler来执行任务得到结果,当作业执行完毕或者执行失败后,阻塞停止,返回作业执行结果给runJob方法。
runJob函数继续调用了submitJob方法提交一个作业,submitJob函数会返回一个JobWaiter类的示例waiter,JobWaiter类主要被用于两个用途:一是通过eventProcessActor对象发送一个JobCancelled请求消息来取消一个作业的执行;二是阻塞DAGScheduler.runJob所在进程,并等待提交作业执行完毕,实现代码如下。JobWaiter类内部会监听每一个任务的完成事件,统计任务完成的个数,在每个任务完成之后调用回调函数resultHandler来执行任务得到结果,当作业执行完毕或者执行失败后,阻塞停止,返回作业执行结果给runJob方法。
下面继续研究submitJob方法,其实现代码如下。程序首先确保RDD分区的编号在合法范围内,并给当前的作业分配一个编号。对于分区数目为0的RDD,直接返回一个JobWaiter对象,这个JobWaiter对象在阻塞一开始时就会立即返回任务执行成功。对于分区数目不为0的RDD,则新建一个JobWaiter对象,通过eventProcessActor对象发送一个作业提交请求信号JobSubmitted,随后也返回一个JobWaiter对象给runJob函数。
下面继续研究submitJob方法,其实现代码如下。程序首先确保RDD分区的编号在合法范围内,并给当前的作业分配一个编号。对于分区数目为0的RDD,直接返回一个JobWaiter对象,这个JobWaiter对象在阻塞一开始时就会立即返回任务执行成功。对于分区数目不为0的RDD,则新建一个JobWaiter对象,通过eventProcessActor对象发送一个作业提交请求信号JobSubmitted,随后也返回一个JobWaiter对象给runJob函数。
正如7.3.1小节所讲的,JobSubmitted信号会被DAGSchedulerEventProcessActor类的receive方法接收,并调用DAGScheduler.haddleJobSubmitted方法来处理该信息,该方法的实现代码如下。其中,参数列表中的listener参数就是在submitJob函数中创建的JobWaiter对象。handleJobSubmitted首先做的第一件事即调用newStage函数对作业进行阶段划分,得到一个表示末阶段(Final Stage)的变量finalStage,finalStage内不仅存储末阶段内部信息,还可能保存了父阶段的信息,而父阶段又会保存祖父阶段的信息,因此finalStage实际上已经保存了希望得到的DAG的信息。阶段划分的过程相对比较复杂,7.3.3小节会单独讲解newStage内部如何实现阶段划分。
正如7.3.1小节所讲的,JobSubmitted信号会被DAGSchedulerEventProcessActor类的receive方法接收,并调用DAGScheduler.haddleJobSubmitted方法来处理该信息,该方法的实现代码如下。其中,参数列表中的listener参数就是在submitJob函数中创建的JobWaiter对象。handleJobSubmitted首先做的第一件事即调用newStage函数对作业进行阶段划分,得到一个表示末阶段(Final Stage)的变量finalStage,finalStage内不仅存储末阶段内部信息,还可能保存了父阶段的信息,而父阶段又会保存祖父阶段的信息,因此finalStage实际上已经保存了希望得到的DAG的信息。阶段划分的过程相对比较复杂,7.3.3小节会单独讲解newStage内部如何实现阶段划分。
阶段划分完毕之后,程序将当前作业转变成活动作业,活动作业与普通作业最大的不同在于前者存储了阶段划分信息(即存储了finalStage)。而后对于一些比较小型的作业,例如仅仅只有一个阶段的并且最后执行action或者first动作的作业,直接调用runLocally方法,并最终调用runLocallyWithinThread方法,在Driver程序所在的节点上的一个线程内直接执行内部的任务,从而节省集群资源调度所消耗的时间。如果不适宜在本地运行,就会调用DAGScheduler.submitStage方法将末阶段提交给集群运行。
在submitStage方法中,程序会检查传入阶段是否有父阶段尚未执行,如果有,则会通过调用submitStage(parent)优先执行父阶段,并将自己放入到waitingStages队列当中,等待后期被取出并执行,如果前面所有阶段都已经执行完毕,则直接调用submitMissingTasks方法,执行当前阶段内部的任务。submitMissingTasks方法负责将一个阶段划分成多个任务并交付集群执行,具体的实现会在第7.4.2小节中详细描述。
阶段划分完毕之后,程序将当前作业转变成活动作业,活动作业与普通作业最大的不同在于前者存储了阶段划分信息(即存储了finalStage)。而后对于一些比较小型的作业,例如仅仅只有一个阶段的并且最后执行action或者first动作的作业,直接调用runLocally方法,并最终调用runLocallyWithinThread方法,在Driver程序所在的节点上的一个线程内直接执行内部的任务,从而节省集群资源调度所消耗的时间。如果不适宜在本地运行,就会调用DAGScheduler.submitStage方法将末阶段提交给集群运行。
在submitStage方法中,程序会检查传入阶段是否有父阶段尚未执行,如果有,则会通过调用submitStage(parent)优先执行父阶段,并将自己放入到waitingStages队列当中,等待后期被取出并执行,如果前面所有阶段都已经执行完毕,则直接调用submitMissingTasks方法,执行当前阶段内部的任务。submitMissingTasks方法负责将一个阶段划分成多个任务并交付集群执行,具体的实现会在第7.4.2小节中详细描述。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。