详细解析PairRDDFunctions的一些聚合/归并操作的API,包括aggregateByKey、reduce-ByKey、foldByKey等。
针对Spark 1.3之前的版本,以下所有案例都需要先导入隐式转换,如下:
但当前为Spark 1.3版本,隐式转换的方法已经移入RDD伴生对象中,因此不再需要导入隐式转换。
一、aggregateByKey
1.定义
2.功能描述
aggregateByKey,顾名思义,和RDD的aggregate方法在逻辑上的功能是一致的,只是,这里聚合操作的对象由RDD分区的全部数据变成了PairRDDFunctions分区中按key分组后得到的value数据集。
即,RDD的aggregate与PairRDDFunctions的aggregateByKey这两个API的差异点,就在于对分区中进行聚合时,聚合的目标数据集不同。
3.示例
案例中实现作为Key值的单词的统计功能、合并功能,具体代码及结果如下:
简化,去掉zeroValue的定义或U的类型指定:
进一步简化,去掉函数的定义:
4.示例解析
查看aggregateByKey的签名,可以看出下面两个aggregateByKey比第一个增加了分区器信息的设置,包含修改分区器的分区个数,替换分区器。
为了重点解析aggregateByKey的功能,我们以第一个aggregateByKey方法进行案例解析,在案例解析之后,继续从源码角度进一步对这一类的API进行详细解析。
注意:但当前为Spark 1.3版本,不需要导入隐式转换。
5.扩展内容
以aggregateByKey为例,介绍下对API功能的推导方法。通过对参数中各个函数进行解析,来理解该API的功能。这只是个人使用API时常用的一种推导方法,当遇到问题时,建议从源码角度去解析。
解析过程是通过RDD的类型、各个参数函数的签名,以及最终API的返回类型进行推导的,这种推导方式可以应用在各种API的解析上,这样可以避免阅读源码才能理解功能的尴尬,提高对Spark API的学习效率。
补充:当推导A方法时,如果已经熟悉B方法,同时B方法又调用了A方法时,可以结合B方法的理解进行推导。如果有兴趣,可以试试结合aggregateByKey方法的理解,来推导combineByKey方法。
以下是aggregateByKey方法的详细推导过程:
1)RDD的元素类型为[K,V],即Key-Value形式的二元组类型,其中Key的类型为K,Value的类型为V。
2)由签名可知,aggregateByKey方法的初始值zeroValue类型为U,seqOp定义为(U,V)=>U,combOp定义为(U,U)=>U,最终返回RDD的元素类型为[K,U]。
3)因此,产生的类型变化是V=>U,即用初始值zeroValue来归并类型为V的值,得到类型U的值,对应操作类型为:(U,V)=>X(X表示未知类型),具有该签名的只有se-qOp:(U,V)=>U函数,因此第一步归并操作应该是seqOp对zeroValue和RDD元素的Value进行的。
4)由于RDD是分布式的数据集,所有的操作应该先针对分区进行,所以seqOp是在分区元素上递归地进行归并,即不断地进行(U,V)=>U操作,最终分区归并结果为类型U的值。
5)到这一步,可以知道各个分区都返回了U值,而aggregateByKey是对整个RDD进行的,也就是最终要合并各个分区的结果,这时候就是对U类型的元素集合进行合并,最终得到U类型的值。因此合并的操作定义应该是(U,U)=>U,参数中只有combOp符合该定义。
通过以上解析,就可以知道,用zeroValue和seqOp对分区进行归并,然后用combOp再对分区归并的结果数据集再次进行归并。
二、combineByKey
1.定义
2.功能描述
在每个partition中先创建初始combiner(createCombiner),然后将当前RDD的各个分区的数据单元逐个输入各个分区对应的combiner进行处理,在各个partition处理结束后,再在mergeCombiner中将各个partition的处理结果进行综合处理。
3.示例
案例的类型信息:(www.xing528.com)
1)K:Int。
2)V:String。
3)C:List[String]。
进一步简化,去掉函数的定义:
4.示例解析
各个参数以及返回值的含义如下:
1)使用createCombiner,处理各个分区的第一个元素,得到类型为C的新值。
2)使用mergeValue,对各个分区的元素进行merge,对由createCombiner得到的输出值,和分区元素值不断执行mergeValue,最后得到分区的归并值,类型为C。
3)使用mergeCombiners,对上一步得到的各个分区的归并值,再次进行合并,得到类型为C的值。
4)最后,返回类型为[(K,C)]。
5.应用场景
该API将大数据的处理转变为对小数据量的分区级别的处理,然后合并各个分区处理后再次进行聚合。在大数据量对处理的性能影响极大的情况下,这种先分区再合并的模式可以极大提高性能。尤其是在各个分区聚合后的数据量很小的情况下。
该应用场景适用于所有类似的聚合操作,比如调用了该方法的aggregateByKey,只是各自聚合API使用了不同的参数,以及对输入输出类型的要求不同而已。
如果查看源码,可以看到内部有mapPartitions方法的调用。
三、foldByKey
1.定义
2.功能描述
对每个分区元素,基于相同key值的value数据集合,进行fold操作。参数zeroValue为fold操作时使用的初始值。
3.示例
简化,去掉函数的定义:
4.示例解析
可以将RDD的Value类型V转换为其他类型C。
四、reduceByKey
1.定义
2.功能描述
对每个分区元素,基于相同key值的value数据集合,进行func操作。与fold相比,该方法没有提供zeroValue初始值。由于没有初始值,当某key的value只有一个值时,func是不会执行的。
3.示例
1)示例1:单词统计。
2)示例2:合并相同Key值的value。
进一步简化,去掉函数的定义:
4.示例解析
在reduce时,只能得到相同类型的结果。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。