Shuffle框架的设计可以从两方面去理解:一方面为了Shuffle模块更加内聚,并与其他模块解耦;另一方面为了更方便替换、测试和扩展Shuffle的不同实现方式。从Spark 1.1版本开始,引进了可插拔式的Shuffle框架(通过将Shuffle相关的实现封装到一个统一的对外接口,提供一种具体实现可插拔的框架)。Spark框架中,通过ShuffleManager来管理各种不同实现机制的Shuffle过程,由ShuffleManager统一构建、管理具体实现子类来实现Shuffle框架的可插拔的Shuffle机制。
在详细描述Shuffle框架实现细节之前,先给出可插拔式Shuffle的整体架构的类图,如图7-2所示。
图7-2 可插拔式Shuffle的整体架构的类图
在DAG的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时会将作业(Job)划分成多个Stage。相应的,在源代码实现中,通过在划分Stage的关键点——构建ShuffleDependency时——进行Shuffle注册,获取后续数据读写所需的ShuffleHandle。
Stage阶段划分的详细过程可以参考DAG调度章节的内容,最终每个作业(Job)提交后都会对应生成一个ResultStage与若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage。ResultStage与ShuffleMapStage中的Task分别对应了ResultTask与ShuffleMapTask。一个作业,除了最终的ResultStage外,其他若干ShuffleMapStage中的各个ShuffleMapTask都需要将最终的数据根据相应的分区器(Partitioner)对数据进行分组(即将数据重组到新的各个分区中),然后持久化分组后的数据。相应的,每个RDD本身记录了它的数据来源,在计算(compute)时会读取所需的数据,对于带有宽依赖的RDD,读取时会获取在ShuffleMapTask中持久化的数据。
从图7-2中可以看到,外部宽依赖相关的RDD与ShuffleManager之间的注册交互,通过该注册,每个RDD自带的宽依赖(ShuffleDependency)内部会维护Shuffle的唯一标识信息ShuffleId,以及与Shuffle过程具体读写相关的句柄ShuffleHandle,后续在ShuffleMapTask中启动任务(Task)时,可以通过该句柄获取相关的Shuffle写入器实例,实现具体的数据磁盘写操作。
而在宽依赖(ShuffleDependency)的RDD中,执行compute时会去读取上一Stage为其输出的Shuffle数据,此时同样会通过该句柄获取相关的Shuffle读取器实例,实现具体数据的读取操作。需要注意的是,当前Shuffle的读写过程中,与BlockManager的交互,是通过MapOutputTracker来跟踪Shuffle过程中各个任务的输出数据的。在任务完成等场景中,会将对应的MapStatus信息注册到MapOutputTracker中,而在compute时的数据读取过程中,也会通过该跟踪器来获取上一Stage的输出数据在BlockManager中的位置,然后通过getReader得到的数据读取器,从这些位置中读取数据。(www.xing528.com)
目前对Shuffle的输出进行跟踪的MapOutputTracker并没有和Shuffle数据读写类一样,也封装到Shuffle的框架中。如果从代码聚合与解耦等角度出发,也可以将MapOutputTracker合并到整个Shuffle框架中,然后在Shuffle写入器输出数据之后立即进行注册,在数据读取器读取数据之前获取位置等(但对应的DAG等调度部分也需要进行修改)。
ShuffleManager封装了各种Shuffle机制的具体实现细节,其包含的接口与属性如下所示。
1)registerShuffle:每个RDD在构建它的父依赖(这里特指ShuffleDependency)时,都会先注册到ShuffleManager,获取ShuffleHandler,用于后续数据块的读写等。
2)getWriter:可以通过ShuffleHandler获取数据块写入器,写数据时通过Shuffle的块解析器shuffleBlockResolver获取写入位置(通常将写入位置抽象为Bucket,位置的选择则由洗牌的规则即Shuffle的分区器决定),然后将数据写入到相应位置(理论上,该位置可以位于任何能存储数据的地方,包括磁盘、内存或其他存储框架等,目前在可插拔框架的几种实现中,Spark与Hadoop一样都采用了磁盘的方式进行存储,主要目的是为了节约内存,同时提高容错性)。
3)getReader:可以通过ShuffleHandler获取数据块读取器,然后通过Shuffle的块解析器shuffleBlockResolver来获取指定数据块。
4)unregisterShuffle:与注册相对应,用于删除元数据等后续清理操作。
5)shuffleBlockResolver:Shuffle的块解析器,通过该解析器,为数据块的读写提供支撑层,便于抽象具体的实现细节。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。