对应这种SerializedShuffleHandle及其相关的Shuffle数据写入器类型的相关代码可以参考SortShuffleManager类的getWriter方法,关键代码如下。
在数据写入器类UnsafeShuffleWriter中,使用的仍然是SortShuffleManager实例中的变量shuffleBlockResolver来对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用IndexShuffleBlockResolver。
在第18行代码中,可以看到UnsafeShuffleWriter构建时传入了一个与其他两种基于Sor-ted的Shuffle实现机制不同的参数:context.taskMemoryManager(),在此构建了一个Task-MemoryManager实例并传入UnsafeShuffleWriter,TaskMemoryManager与Task是一对一的关系,负责管理分配给Task的内存。
下面开始解析写数据块的UnsafeShuffleWriter类的源代码实现。首先查看其write的方法,代码如下。
写过程中的关键步骤只有3步。
1)第5~7行,将每条记录插入外部排序器。
2)第11行,写数据文件与所有文件,在写的过程中,会先合并外部排序器在插入过程中生成的Spill中间文件。
3)第17行,最后释放外部排序器的资源。
首先查看将每条记录插入外部排序器(ShuffleExternalSorter)时所使用的insertRecordIn-toSorter方法,其关键代码如下。
下面继续查看第二步写数据文件与索引文件的closeAndWriteOutput方法,其关键代码如下。(www.xing528.com)
closeAndWriteOutput方法主要有以下几步。
1)触发外部排序器,获取Spill信息。
2)合并中间的Spill文件,生成数据文件,并返回各个分区对应的数据量信息。
3)根据各个分区的数据量信息生成数据文件对应的索引文件。
由于writeIndexFileAndCommit方法和前面Sorted Based Shuffle机制的实现一样,在此仅分析过程中不同的Spill文件合并步骤,即mergeSpills方法的具体实现。
mergeSpills方法的关键代码如下。
各种合并策略在性能上具有一定的差异,会根据具体的条件采用,主要有基于NIO和基于普通文件流合并文件的方式。下面简单描述一下基于文件合并流的处理过程,代码如下。
基于NIO的文件合并流程基本类似,只是底层采用了NIO的技术实现。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。