下面继续从源码的角度,了解Spark是如何触发Shuffle写和Shuffle读过程的。
mapper本质上是一个任务。7.3调度章节曾提及过DAG调度器会在一个阶段内部划分任务,根据阶段的不同,得到ResultTask和ShuffleMapTask两类任务。ResultTask会将计算结果返回给Driver,ShuffleMapTask则将结果传递给Shuffle依赖中的子RDD。因此,可以从ShuffleMapTask入手,观察Mapper的大致工作流程,实现代码如下。
由于一个任务对应当前阶段末RDD内的一个分区,因此通过rdd.iterator(partition,context)可以计算得到该分区的数据。接下来便是执行Shuffle写操作,该操作由一个ShuffleWriter实例通过调用write接口完成,ApacheSpark从ShuffleManager实例中获取该ShuffleWriter对象。
上文提及过,Spark提供了两类Shuffle机制。对应的,ShuffleManager也有两类子类,分别是HashShuffleManager和SortShuffleManager。ShuffleManager的主要作用是提供ShuffleWriter和ShuffleReader用于Shuffle写和Shuffle读过程。HashShuffleManager提供HashShuffleWriter和HashShuffleReader,而SortShffleManager提供的是SortShuffleWriter和HashShuffleReader。可以看到,哈希Shuffle和排序Shuffle的唯一区别在于Shuffle写过程,读过程完全一致。
继续来观察Shuffle读的触发。Spark中,聚合器中的3个函数是在PairRDDFunctions. combineByKey方法中指定。可以看到,若新RDD与旧RDD的分区器不同时,会生成一个ShuffledRDD,实现代码如下。
由于一个任务对应当前阶段末RDD内的一个分区,因此通过rdd.iterator(partition,context)可以计算得到该分区的数据。接下来便是执行Shuffle写操作,该操作由一个ShuffleWriter实例通过调用write接口完成,ApacheSpark从ShuffleManager实例中获取该ShuffleWriter对象。
上文提及过,Spark提供了两类Shuffle机制。对应的,ShuffleManager也有两类子类,分别是HashShuffleManager和SortShuffleManager。ShuffleManager的主要作用是提供ShuffleWriter和ShuffleReader用于Shuffle写和Shuffle读过程。HashShuffleManager提供HashShuffleWriter和HashShuffleReader,而SortShffleManager提供的是SortShuffleWriter和HashShuffleReader。可以看到,哈希Shuffle和排序Shuffle的唯一区别在于Shuffle写过程,读过程完全一致。(www.xing528.com)
继续来观察Shuffle读的触发。Spark中,聚合器中的3个函数是在PairRDDFunctions. combineByKey方法中指定。可以看到,若新RDD与旧RDD的分区器不同时,会生成一个ShuffledRDD,实现代码如下。
观察ShuffledRDD是如何获取分区数据的。与Shuffle写过程类似,先从ShuffleManager中获取ShuffleReader,通过ShuffleReader的read接口拉取和计算特定分区中的数据,代码如下。
观察ShuffledRDD是如何获取分区数据的。与Shuffle写过程类似,先从ShuffleManager中获取ShuffleReader,通过ShuffleReader的read接口拉取和计算特定分区中的数据,代码如下。
在后面8.2以及8.3小节会进一步分析ShuffleWriter.write和ShuffleReader.read的具体实现。
在后面8.2以及8.3小节会进一步分析ShuffleWriter.write和ShuffleReader.read的具体实现。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。