基于Sort的Shuffle实现机制中相关的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。对应这两种ShuffleHandle及其相关的Shuffle数据写入器类型的相关代码可以参考SortShuffleManager类的getWriter方法,关键代码如下。
在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块和物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用IndexShuffleBlockResolver。
下面开始解析这两种写数据块方式的源代码实现。
1.BypassMergeSortShuffleWriter写数据的源代码解析
BypassMergeSortShuffleWriter类实现了带Hash风格的基于Sort的Shuffle机制,为每个Reduce端的任务构建一个输出文件,将输入的每条记录分别写入各自对应的文件中,并在最后将这些基于各个分区的文件合并成一个输出文件。
从前面提到的实例中可以知道,在Rueducer端任务数比较少的情况下,基于Hash的Shuffle实现机制明显比基于Sort的Shuffle实现机制要快,因此基于Sort的Shuffle实现机制提供了一个fallback方案,当Rueducer端任务数少于配置属性“spark.shuffle.sort.bypass-MergeThreshold”设置的个数时,使用带Hash风格的fallback计划,由BypassMergeSortShuf-fleWriter具体实现。
使用该写入器的条件如下。
1)不能指定Ordering,从前面数据读取器的解析可以知道,当指定Ordering时,会对分区内部的数据进行排序。因此,对应的BypassMergeSortShuffleWriter写入器避免了排序开销。
2)不能指定聚合器(Aggregator)。
3)分区个数小于“spark.shuffle.sort.bypassMergeThreshold”配置属性指定的个数。
和其他ShuffleWriter的具体子类一样,BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下。
其中第26行代码,描述了各个分区所生成的中间临时文件的格式与对应的BlockId,具体代码如下。
从上面的分析可以知道,每个Map端的任务最终会生成两个文件:数据(Data)文件和索引(Index)文件。(www.xing528.com)
另外在使用DiskBlockObjectWriter写记录时,是以32条记录批次写入的,不会占用太大的内存。但由于不能指定聚合器(Aggregator),写数据时也是直接写入记录,因此对应后续的网络I/O的开销也会很大。
2.SortShuffleWriter写数据的源代码解析
前面BypassMergeSortShuffleWriter的写数据是在Reduce端的分区个数较少的情况下提供的一种优化方式,但当数据集规模非常大,使用该写数据方式不合适时,就需要使用SortShuffleWriter来写数据块。
和其他ShuffleWriter的具体子类一样,SortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下。
这种基于Sort的Shuffle实现机制中引入了外部排序器(ExternalSorter),ExternalSorter继承了Spillable,因此内存使用在达到一定阈值时,会Spill到磁盘,可以减少内存带来的开销。
通过查看外部排序器(ExternalSorter)的insertAll方法,对应调用在第20行,该方法内部在处理完(包含聚合和非聚合两种方式)每一条记录时,都会检查是否需要Spill。如果内部各种细节比较多,这里以Spill条件判断为主线,简单描述与条件相关的代码。具体判断是否需要Spill的相关代码,可以参考Spillable类中的maybeSpill方法(该方法简单的调用流程为:ExternalSorter#insterAll→ExternalSorter#maybeSpillCollection→Spillable#maybeSpill),关键代码如下。
对于外部排序器(ExternalSorter),除了insertAll方法外,它的writePartitionedFile方法也非常重要。该方法的代码如下。
其中,BlockId是数据块的逻辑位置,File参数则是对应逻辑位置的物理存储位置。这两个参数值的获取方法和使用BypassMergeSortShuffleHandle及其对应ShuffleWriter是一样的。
在该方法中,有一个容易混淆的地方,与Shuffle的度量(Metric)信息有关,相应的代码如下。
其中,第1行对应修改了Spilled的数据在内存中的字节大小,第2行则对应修改了Spilled的数据在磁盘中的字节大小。在内存中时,数据是反序列化形式存放的,而存储到磁盘(默认会序列化)时,会对数据进行序列化。反序列化后的数据会远远大于序列化后的数据(也可以通过UI界面查看这两个度量信息的大小差异来确认,具体差异的大小和数据与选择的序列化器有关,有兴趣的读者可以参考各序列器间的性能等比较文档)。
从这一点也可以看出,如果在内存中使用反序列化的数据,会大大增加内存的开销(也意味着增加GC负载),并且反序列化也会增加CPU的开销,因此引入了利用Tungsten项目的基于Tungsten Sort的Shuffle实现机制,Tungsten项目的优化主要有3个方面,这里从避免反序列化的数据量会极大地消耗内存这点考虑,主要是借助Tungsten项目的内存管理模型,可以直接处理序列化的数据;同时在CPU开销方面,直接处理序列化数据,可以避免数据反序列化的这部分处理开销。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。