首页 理论教育 分区数设置案例与源码解析

分区数设置案例与源码解析

时间:2023-06-25 理论教育 版权反馈
【摘要】:解析各种情况下的RDD的分区数设置情况,具体包括加载文件创建的RDD的分区数、SparkContext中默认的分区数、经过转换后的RDD的分区数以及针对特定的Key-Value格式类型的RDD的分区数。重新启动脚本,加载文件,并查询加载后RDD的分区数。同时,针对Key-Value元素类型的RDD,还提供了精细控制的分区器partitioner,可以通过设置分区器,来指定元素如何分区的策略,以及分区的个数。

分区数设置案例与源码解析

解析各种情况下的RDD的分区数设置情况,具体包括加载文件创建的RDD的分区数、SparkContext中默认的分区数、经过转换后的RDD的分区数以及针对特定的Key-Value格式类型的RDD的分区数。

一、加载文件创建RDD时的分区数解析

如下所示:

978-7-111-51909-6-Chapter02-121.jpg

978-7-111-51909-6-Chapter02-122.jpg

sc.defaultParallelism是当前默认的并行数,对应加载的本地文件(非HDFS文件系统)以默认的并行数作为构建的RDD的分区数。

注意:当前没有设置“spark.default.parallelism”配置属性,所以使用sc.getConf.getInt("spark.default.parallelism",0)获取属性时,值为设置的默认值0。

查看SparkContext的源码

978-7-111-51909-6-Chapter02-123.jpg

跳转到taskScheduler.defaultParallelism源码:

978-7-111-51909-6-Chapter02-124.jpg

单击方法前的箭头,获取其具体子类的实现,当前仅提供一个具体子类,即TaskSchedulerImpl,如图2.28所示。

978-7-111-51909-6-Chapter02-125.jpg

图2.28 获取taskScheduler.defaultParallelism方法的具体重载信息

跳转到TaskSchedulerImpl的源码:

978-7-111-51909-6-Chapter02-126.jpg

继续跳转到backend.defaultParallelism,然后跳转到具体子类的实现,继承的全部子类如图2.29所示。

978-7-111-51909-6-Chapter02-127.jpg

图2.29 获取重载backend.defaultParallelism方法的具体子类

如MesosSchedulerBackend(对应集群模式)中的源码所示:

978-7-111-51909-6-Chapter02-128.jpg

可以看到defaultParallelim的配置属性为:“spark.default.parallelism”,即集群模式下使用该属性作为defaultParallelim,并且默认值为8。

如子类LocalBackend中的源码(对应Local模式)所示:(www.xing528.com)

978-7-111-51909-6-Chapter02-129.jpg

可以看到defaultParallelim的配置属性为:“spark.default.parallelism”,但当没有设置该配置属性时,默认值为总的内核数,LocalBackend对应Local模式。

修改默认的配置文件:$_HOME/conf/spark-defaults.conf,在文件中添加该属性的配置,如下所示:

978-7-111-51909-6-Chapter02-130.jpg

在文件中添加一行配置:spark.default.parallelism 6。重新启动脚本,加载文件,并查询加载后RDD的分区数。

注意:默认的并行度只是作为加载文件时分区数的最小值参考,实际的分块数由加载文件时的Splits数决定,即文件的Blocks数。也可以由加载时的API参数指定分区数。

二、Key-Value元素类型的RDD的分区数解析

所有RDD都可以从getPartitions方法中找出分区数和父RDD的分区数间的依赖关系。同时,针对Key-Value元素类型的RDD,还提供了精细控制的分区器partitioner,可以通过设置分区器,来指定元素如何分区的策略,以及分区的个数。

当没有设置分区器时,默认使用HashPartitioner,具体参见分区器中的默认分区器定义源码:

978-7-111-51909-6-Chapter02-131.jpg

978-7-111-51909-6-Chapter02-132.jpg

从代码中可以看到,当没有设置分区数时,会使用“spark.default.paralelism”属性配置的值作为默认的分区数值。

以Key-Value为特定类型的PairRDDFunctions类中reduceByKey方法构建RDD为例,参见reduceByKey代码:

978-7-111-51909-6-Chapter02-133.jpg

在调用基础的reduceByKey方法(另两个API会分别设置分区数或分区器)时,由于没有设置分区数或分区器,因此使用了默认的分区器,这里的默认分区器就是使用上面在ob-ject Partitioner中提到的defaultPartitioner方法。

三、源码解析的扩展

在源码解析过程中,可以进行一些扩展,比如之前的MapPartitionsRDD源码:

978-7-111-51909-6-Chapter02-134.jpg

978-7-111-51909-6-Chapter02-135.jpg

一般RDD设置了分区器partitioner时,在getPartitions中都会通过该分区器,控制如何从依赖的各个父RDD中获取各个分区的数据,而在MapPartitionsRDD中,只是根据参数pr-eservesPartitioning是否为true来控制是否设置MapPartitionsRDD的分区器partitioner。

因此引出了以下问题:当preservesPartitioning控制当前MapPartitionsRDD的partitioner的设置,preservesPartitioning的作用是什么?两种取值true或false具体应该在什么情况下?

问题的解析:当前MapPartitionsRDD的getPartitions方法中并没有依据partitioner记录分区的数据来源,但作为分区器,它不仅可以用于RDD分区获取其依赖的父RDD的数据上,还可以将该分区器传递到其子RDD中,在这里,主要的目的是后者,将分区器信息在DAG的Lineage上进行传递。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈