无论是哈希Shuffle还是排序Shuffle,两者使用的ShuffleReader实际上都是HashShuffle Reader。HashShuffleReader的read方法实现的代码如下。
Shuffle读过程可被分成3步,分别是:拉取数据、聚合-计算和排序。
1.拉取数据
reducer首先需要调用BlockStoreShuffleFetcher.fetch方法。从所有的map端拉取数据,拉取的数据的位置实际上是被记录在MapOutputTracker中。当一个ShuffleMap阶段创建时,该阶段对应的Shuffle会注册到MapOutputTracker中。而当该阶段执行完毕后(也就是所有的mapper执行完毕),Spark会将mapper执行的主机地址以及mapper端的数据大小登记到MapOutputTracker对应的Shuffle当中,因此这里会根据ShuffleID从MapOutputTracker中获取数据的位置,再调用底层存储相关接口来拉取数据。
2.聚合-计算
当数据到达之后,接下来需要对数据进行聚合-计算操作。由于map端可能已经执行过聚合-计算操作,因此到reduce端的数据可能已经不是原先的<K,V>数据类型,而是<K,C>键值对。如果mapSideCombine是true,则调用combineCombinersByKey方法;否则调用combineValuesByKey方法。两者最大的区别在于update函数,实现代码如下。
Shuffle读过程可被分成3步,分别是:拉取数据、聚合-计算和排序。
1.拉取数据(www.xing528.com)
reducer首先需要调用BlockStoreShuffleFetcher.fetch方法。从所有的map端拉取数据,拉取的数据的位置实际上是被记录在MapOutputTracker中。当一个ShuffleMap阶段创建时,该阶段对应的Shuffle会注册到MapOutputTracker中。而当该阶段执行完毕后(也就是所有的mapper执行完毕),Spark会将mapper执行的主机地址以及mapper端的数据大小登记到MapOutputTracker对应的Shuffle当中,因此这里会根据ShuffleID从MapOutputTracker中获取数据的位置,再调用底层存储相关接口来拉取数据。
2.聚合-计算
当数据到达之后,接下来需要对数据进行聚合-计算操作。由于map端可能已经执行过聚合-计算操作,因此到reduce端的数据可能已经不是原先的<K,V>数据类型,而是<K,C>键值对。如果mapSideCombine是true,则调用combineCombinersByKey方法;否则调用combineValuesByKey方法。两者最大的区别在于update函数,实现代码如下。
3.排序
当Shuffle读端聚合-计算过程完成后,得到的数据是无序的,这时候如果对数据进行排序,则需要设置ShuffleDependency.keyOrdering成员不为空。例如sortByKey操作中,得到的ShuffledRDD的聚合器为空。keyOrdering设置为顺序或者逆序,在此处会对数据进行一次排序。
3.排序
当Shuffle读端聚合-计算过程完成后,得到的数据是无序的,这时候如果对数据进行排序,则需要设置ShuffleDependency.keyOrdering成员不为空。例如sortByKey操作中,得到的ShuffledRDD的聚合器为空。keyOrdering设置为顺序或者逆序,在此处会对数据进行一次排序。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。