并行度指的就是RDD的分区数,由于一个分区对应一个Task,并行度也是一个Stage中的Task数,这些Task被并行处理。通过前面的章节可以知道,RDD是以Partition即分区的形式散落在集群上的,每个分区都包含一部分待处理的数据,Spark程序运行时,会为每个待处理的分区创建一个Task,且默认情况下每个task占用一个CPU Core来处理。
Spark有一套自己自动推导出默认的分区数的机制。当在程序中通过操作算子如textFile等读取外部数据源以获得Input RDD时,Spark会自动地根据外部数据源的大小推导出一个合适的默认分区数,如HDFS文件的每个Block就对应一个分区;在对RDD进行Map类不涉及Shuffle的操作时,由于分区数具有遗传性,新产生的RDD的分区数由parent RDD中最大的分区数决定;在对RDD进行Reduce类涉及Shuffle操作的算子时(如groupByKey、re-duceByKey等各种Reduce操作算子),由于分区数具有遗传性,新产生的RDD的分区数也由Parent RDD中最大的分区数决定。如果是在Spark-Shell交互式命令终端下,可以通过方法rdd.partitions.size来获得某个RDD的分区数,而在Spark 1.6.0以后的版本中,也可以通过rdd.getNumPartitions()来获得某个RDD的分区数。
并行度对性能的影响有两方面,当并行度不够大时会存在资源的闲置与浪费,比如一个应用程序分配到了1000个Core,但是一个stage里只有30个Task,此时就可以提高并行度以提升硬件利用率;而当并行度太大时,Task常常几微秒就执行完毕,或Task读写的数据量很小,这种情况下,Task频繁地开辟与销毁的不必要的开销则太大,就需要调小并行度。
由于Spark自动推导出来的默认分区数很多时候并不理想,必须人为地加以控制来改变并行度。Spark提供了4种改变并行度的方式。
第一种,在使用读取外部数据源的textFile类算子时,可以通过可选的参数minPartitions来显示指定最小的分区数。
第二种,针对已经存在的RDD,可以通过方法repartition()或coalesce()来改变并行度。repartition()和coalesce()的区别在于,前者会产生Shuffle,而后者默认不会产生Shuffle。事实上,当有大量小任务(任务处理的数据量小且耗时短)时,比如某个RDD在Filter操作后,由于过滤掉了大量数据,每个分区都只剩下了很少量的数据,这时常用coalesce()来合并分区,调小并行度,减少不必要的任务开辟与销毁的消耗;而当任务耗时长且处理的数据量大时,如果计算只发生在部分Executor上,常用repartition()来重新分区,提高并行度,开辟更多的并行计算的任务来完成计算。(www.xing528.com)
第三种,在对RDD进行Reduce类涉及Shuffle操作的算子时,这些算子大都可以接受一个显式指定的参数来确定新产生的RDD的分区数,我们可以显式地指定这类参数来改变Shuffle后新产生的RDD的分区数,而不是采用系统推导出的默认的分区数。
第四种,也可以配置参数spark.default.parallelism来设置默认的并行度。该参数其实指定的就是在对RDD进行Reduce类涉及Shuffle操作的算子时,如果没有对这些算子显示指定参数来确定新产生的RDD的分区数时,这类Reduce类涉及Shuffle操作的算子产生的新的RDD的Paritition数量。该参数也指定了Parallelize等没有Parent RDDs类操作的算子所产生的新的RDD的分区数。
一个最佳实践是,将并行度设置为集群的总的CPU Cores个数的2~3倍,比如Execu-tor的总CPU Core数量为400个,那么设置1000个Task是可以的,此时可以充分利用Spark集群的资源;每个分区的大小在128 MB左右。
需要说明的是,通过以上方式确定了任务的并行度,就确定了理论上能够并行执行的任务的数量,而实际执行时真正并发执行的任务数量还要受到应用分配到的实际资源数量的限制,要想改变应用程序获得的资源数目,这就涉及资源参数的调优。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。