1.并行度
除非为每一个操作都设置足够高的并行度,否则集群不能有效的被利用。Spark会根据每一个文件的大小自动设置运行在该文件Map任务的个数(我们也可以通过SparkContext的配置参数来控制);对于分布式Reduce任务(例如groupByKey或者reduceByKey),则利用最大RDD的分区数,我们可以通过配置reduceByKey这些方法的第二个参数来传入并行度或者通过设置系统参数spark.default.parallelism来改变默认值。通常来讲,在集群中,我们建议为每一个CPU核(core)分配2~3个任务。
2.Reduce Task的内存使用
有时,我们会碰到OutOfMemory错误,这不是因为我们的RDD不能加载到内存,而是因为任务执行的数据集过大,例如正在执行groupByKey操作的Reduce任务。Spark的Shuf-fle操作(比如sortByKey、groupByKey、reduceByKey、join等)为了完成分组会为每一个任务创建哈希表,哈希表有可能非常大。最简单的修复方法是增加并行度,这样,每一个任务的输入会变的更小。Spark能够非常有效的支持段时间任务(例如200ms),因为它会对所有的任务复用JVM,这样能减小任务启动的消耗。所以,我们可以放心的使任务的并行度远大于集群的CPU核数。
3.广播“大变量”
使用SparkContext的广播变量可以有效减小每一个任务的大小以及在集群中启动作业的消耗。如果任务会使用驱动程序(Driver Program)中比较大的对象(例如静态查找表),可以考虑将其变成可广播变量。Spark会在主结点(master)打印每一个任务序列化后的大小,所以我们可以通过它来检查任务是不是过于庞大。通常来讲,大于20KB的任务可能都是值得优化的。
4.数据本地性
数据本地性对Spark任务执行的性能有显著影响。如果数据和操作的代码在一起的话那计算将会很快,但是如果数据和代码是分离的,那其中一个必须移动到另外一方。通常来说传输序列化过的代码肯定比传输数据要快得多,毕竟代码比数据小太多了。Spark就是基于此原则建立数据本地性的调度策略。
数据本地性是指数据与处理它的代码有多远。根据数据当前的位置用几个级别来确定,从最近到最远的距离为:(www.xing528.com)
(1)PROCESS_LOCAL:数据和要运行的代码在同一个JVM中,这是最好的分布情况。
(2)NODE_LOCAL:数据和要运行的代码在同一个结点。例如可能在的同一个结点的HDFS中或者同一个结点的不同Executor中。这会比PROCESS_LOCAL慢一点,因为数据要在进程间交换。
(3)NO_PREF:数据从各处访问都差不多快,没有位置偏好。
(4)RACK_LOCAL:数据在同样一个机架的服务器上。数据在不同的服务器上但是在同一个机架,所以需要通过网络传输,通常经过一个交换机。
(5)ANY:数据可能在网络的任何地方并且不再同一个机架上。
Spark倾向于将所有的任务都安排在最佳的位置,但不可能总是这样。当任意空闲结点上没有未处理的数据的时候,Spark会选择较低的位置级别。这里有两个选择:在有数据的结点上等待繁忙的CPU空闲后启动一个任务;立刻启动一个任务但是需要从远程获取数据。
Spark通常会等繁忙的CPU释放资源,但是如果超时,它会把数据移动到远程的空闲CPU上进行计算。数据本地性的各个级别间等待超时的回退时间可以分别配置或者统一配置。如果任务运行很长而且很少在本地运行,可以提高设置,但是通常默认值就表现的不错了。
最后我们总结一下Spark程序优化所需要关注的几个关键点——最主要的是数据序列化和内存优化。对于大多数程序而言,采用Kryo序列化能够解决性能有关的大部分问题。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。