首页 理论教育 哈希Shuffle的写法及优化技巧

哈希Shuffle的写法及优化技巧

时间:2023-06-21 理论教育 版权反馈
【摘要】:可以看到,哈希Shuffle写过程可以划分成两个步骤,先是进行聚合-计算操作,之后再把得到的数据的每一个元素进行持久化。在哈希Shuffle写过程中,首先调用分区器判断数据应该传给哪一个分区,同一个分区中的数据会被写入同一个文件当中,Spark把这样的文件称为桶。Spark从0.8.1开始引入了Shuffle文件合并机制,然而可能与读者预想的将一个mapper传送给不同reducer的数据合并成一个文件,然后再额外提供一个索引文件的机制不大一样。

哈希Shuffle的写法及优化技巧

当ShuffleManager为HashShuffleManager时,ShuffleManager所能获取得到的ShuffleWriter实际上是HashShuffleWriter,我们观察HashShuffleWriter的write方法,实现代码如下。

可以看到,哈希Shuffle写过程可以划分成两个步骤,先是进行聚合-计算操作,之后再把得到的数据的每一个元素进行持久化。

1.聚合计算过程

只有在ShuffleDependency中设置mapSideCombine值为true时,才会在map端执行聚合计算操作,否则直接返回原始的键值对数据。聚合-计算过程调用了聚合器中的方法combineValuesByKey,该方法的源码实现如下。

可以看到,哈希Shuffle写过程可以划分成两个步骤,先是进行聚合-计算操作,之后再把得到的数据的每一个元素进行持久化。

1.聚合计算过程

只有在ShuffleDependency中设置mapSideCombine值为true时,才会在map端执行聚合计算操作,否则直接返回原始的键值对数据。聚合-计算过程调用了聚合器中的方法combineValuesByKey,该方法的源码实现如下。

可以看到,聚合过程中,程序会根据是否允许溢存来使用不同的哈希结构存储数据。若配置参数spark.shuffle.spill被设置为false,表示所有数据都存储在内存当中,Spark这时会使用AppendOnlyMap来存储数据,AppendOnlyMap是Spark自己实现的哈希表,在内部通过使用数组来存储数据,当数据量超出一定限制后,会自动对数据进行扩展,并对所有的键值重新进行哈希操作。若配置参数spark.shuffle.spill设置为true,表示超出限制的数据会被溢存在磁盘当中,Spark这时会使用ExternalAppendOnlyMap来存储数据,ExternalAppendOnlyMap会在哈希表快被填满时检查是否可以申请更多内存空间,如果得到的内存空间仍然不能满足要求,则会将哈希表中的数据按照哈希值(而非键值)进行排序,并将排序后的结果写入磁盘中,最后再把内存中的哈希表以及溢存文件的中数据进行合并-聚合操作,放在哈希表中。两种哈希表的实现较为复杂,出于篇幅关系,在此不进行进一步的展开。

聚合—计算过程中还会调用combiner的changeValue方法,该方法会和之前8.1.2聚合器小节中所讲述的一样,使用3个聚合函数来对数据进行聚合和计算。当所有数据聚合完毕之后,会返回一个包含结果数据的迭代器。(www.xing528.com)

2.持久化过程

得到包含数据的迭代器之后,Spark会遍历迭代器指向容器中的每一个元素,将元素进行持久化。在哈希Shuffle写过程中,首先调用分区器判断数据应该传给哪一个分区,同一个分区中的数据会被写入同一个文件当中,Spark把这样的文件称为桶(Bucket)。

在早期版本的Spark中,每一个mapper会生成R个数量的文件,这就意味着如果一个Shuffle过程中存在M个mapper,会生成R×M个文件,如果Shuffle过程中本身数据量并不大,过多的文件会降低I/O性能。Spark从0.8.1开始引入了Shuffle文件合并机制(Shuffle Consolidation),然而可能与读者预想的将一个mapper传送给不同reducer的数据合并成一个文件,然后再额外提供一个索引文件的机制不大一样。由于集群核心数目可能少于mapper的数量,因此在同一核心上,可能会执行同一Shuffle过程中不同的mapper,因此Spark让这些不同批次的mapper使用同一批文件(桶),这时候,总文件的数量不再是M×R,而是C×R,其中C是集群的核心数量。

可以看到,聚合过程中,程序会根据是否允许溢存来使用不同的哈希结构存储数据。若配置参数spark.shuffle.spill被设置为false,表示所有数据都存储在内存当中,Spark这时会使用AppendOnlyMap来存储数据,AppendOnlyMap是Spark自己实现的哈希表,在内部通过使用数组来存储数据,当数据量超出一定限制后,会自动对数据进行扩展,并对所有的键值重新进行哈希操作。若配置参数spark.shuffle.spill设置为true,表示超出限制的数据会被溢存在磁盘当中,Spark这时会使用ExternalAppendOnlyMap来存储数据,ExternalAppendOnlyMap会在哈希表快被填满时检查是否可以申请更多内存空间,如果得到的内存空间仍然不能满足要求,则会将哈希表中的数据按照哈希值(而非键值)进行排序,并将排序后的结果写入磁盘中,最后再把内存中的哈希表以及溢存文件的中数据进行合并-聚合操作,放在哈希表中。两种哈希表的实现较为复杂,出于篇幅关系,在此不进行进一步的展开。

聚合—计算过程中还会调用combiner的changeValue方法,该方法会和之前8.1.2聚合器小节中所讲述的一样,使用3个聚合函数来对数据进行聚合和计算。当所有数据聚合完毕之后,会返回一个包含结果数据的迭代器。

2.持久化过程

得到包含数据的迭代器之后,Spark会遍历迭代器指向容器中的每一个元素,将元素进行持久化。在哈希Shuffle写过程中,首先调用分区器判断数据应该传给哪一个分区,同一个分区中的数据会被写入同一个文件当中,Spark把这样的文件称为桶(Bucket)。

在早期版本的Spark中,每一个mapper会生成R个数量的文件,这就意味着如果一个Shuffle过程中存在M个mapper,会生成R×M个文件,如果Shuffle过程中本身数据量并不大,过多的文件会降低I/O性能。Spark从0.8.1开始引入了Shuffle文件合并机制(Shuffle Consolidation),然而可能与读者预想的将一个mapper传送给不同reducer的数据合并成一个文件,然后再额外提供一个索引文件的机制不大一样。由于集群核心数目可能少于mapper的数量,因此在同一核心上,可能会执行同一Shuffle过程中不同的mapper,因此Spark让这些不同批次的mapper使用同一批文件(桶),这时候,总文件的数量不再是M×R,而是C×R,其中C是集群的核心数量。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈