阶段划分是作业调度过程的关键所在,其源码实现相对较为复杂,因此用一小节单独讲解。在进入源码之前先探讨下Spark是如何划分阶段的。
一个阶段划分的例子如图7-2所示,用虚线框表示一个阶段,虚线框内所有的RDD都是为实现该阶段而需要被计算的数据。理所当然的,整个作业最后一个RDD的所有分区数据被计算完毕对应的阶段就是所求的末阶段。沿着RDD的依赖关系往前进行深度优先遍历,若到一个Shuffle依赖,依赖的每一个父RDD所有分区数据都计算完毕可以分别对应一个阶段,且都是当前阶段的父阶段,继续沿着父RDD往前遍历;若遇到一个窄依赖,则直接往前遍历,直到当前RDD的所有的依赖关系都被遍历过才返回上一层,通过这个过程,最后会得到一张DAG。DAG的最终阶段称之为结果阶段(Result Stage),其余阶段则被称为ShuffleMap阶段(ShuffleMap Stage)。
图7-2 阶段划分
根据Shuffle依赖来切断RDD计算链,从而划分出多个阶段,这样做的理由在第6章中也有所解释。源码实现上,阶段划分的过程主要完成了3件事情:
(1)阶段的确定
如图7-2所示,将Shuffle依赖作为两个阶段的分割点,并记录两者之间的阶段依赖关系,这部分功能在上一节所述方法newStage中实现,代码如下。
(www.xing528.com)
可以看到,newStage函数内部先调用getParentStage方法获取得到父亲阶段,实例化一个Stage对象,在实例化的过程中记录两个阶段的依赖关系。getParentStage是一个比较复杂的递归过程,编者在此没有对其进行展开,读者只需要知道在每层递归的最后,对于每一个阶段的父亲阶段,都会调用newStage函数将其封装成一个Stage对象,换句话说,getParentStage得到的实际上就是除了当前阶段在内的DAG图。
(2)阶段的登记与绑定
在newStage函数内部,会为阶段生成一个唯一标识ID,并将该ID与阶段进行绑定,而在实例化Stage对象的时候,也会将阶段与作业ID进行绑定。
(3)Shuffle注册
ShuffleMap阶段是一类特殊的阶段,出现该阶段意味着后面将会发生一次Shuffle过程,因此需要向MapOutputTracker注册该Shuffle过程。MapOutputTracker被用于记录MapStatus,也就是Shuffle过程中map端输出文件的位置和大小,这些信息会被用在Shuffle过程reduce端获取map端的数据。在getParentStage函数中,对于每一个Shuffle依赖,调度器会以深度优先搜索的方式找到其前面所有的Shuffle依赖,而后对于包括自身在内的Shuffle依赖,调用newOrUsedStage函数。newOrUsedStage函数会先调用newStage函数创建一个阶段(程序在此实现递归),而后再调用MapOutputTracker对象的registerShuffle方法注册该Shuffle依赖到MapOutputTracker中。除此之外,对于每个Shuffle依赖,都会将其与对应的ShuffleMap阶段进行绑定,其相关实现比较简单,在此不做过多描述。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。