数据本地性是指数据与执行的代码的远近程度。基于数据与执行的代码各自所在的位置,从近到远的本地性级别依次如下。
·PROCESS LOCAL:数据与执行的代码在同一个JVM内存中,数据已经缓存到执行代码所在的Executor的内存中了,即读取缓存在本地结点的数据,这是效率最高的数据本地性级别(该本地性级别和cache有关)。
·NODE LOCAL:数据与执行的代码在同一个结点上,需要从执行代码所在的结点上的本地存储中读取出数据,即读取本地结点硬盘数据。比如,数据在同一个结点的hdfs上,或在同一个结点的另一个Executor上。因为数据需要在进程间移动,该级别比PROCESS LOCAL稍慢。
·NO PREF:从任意地方访问该数据的速度都一样,没有具体的位置偏好。
·RACK LOCAL:数据所在的Server与执行代码的Server在同一台机架上,一般需要通过一台交换机在网络上传输到执行代码所在的结点上。
·ANY:数据位于不在同一台机架的其他集群的结点上,即读取非本地结点数据。
数据本地性对程序的性能有很大的影响。如果数据和要执行的代码在一起,计算速度就很快;而当它们不在一起时,其中一个就必须移动到另一个所在的位置,而数据在网络上的传输会导致大量的延时和开销,毕竟磁盘I/O和网络I/O都是耗时的操作。
在大数据处理模式下,一般代码都比数据小,所以序列化了的代码的移动比数据的移动更快,分布式计算系统的精髓就在于移动代码而非移动数据,即“数据不动代码动”。数据本地性的原则就是尽量避免数据在网络上甚至磁盘上的传输,尽量将计算移到数据所在的结点上进行。Spark的调度机制就是基于这一准则来进行调度的。
Spark中任务的处理需要考虑数据本地性的场合,基本上就两种:一是数据来源于外部数据源(如HDFS);二是数据来源于RDD Cache(即由CacheManager从BlockManager中读取,或者Streaming数据源RDD)。其他情况下,不涉及Shuffle操作的不构成划分Stage和Task的基准,也就不存在本地性问题;而如果是ShuffleRDD,由于其本地性始终为No Pre-fer,因此其实也不存在本地性问题。(www.xing528.com)
在理想情况下,任务当然是分配在可以从本地读取数据的结点上时(同一个JVM内部或同一台物理机器内部)的运行时性能最佳。但是每个任务的执行速度无法准确估计,所以很难在事先获得全局最优的执行策略。当Spark应用程序得到一个计算资源时,如果没有可以满足最佳本地性需求的任务可以运行,即任何空闲的Executor上都没有尚未处理的数据时,计算框架通常有两种选择:①一直等待直到待处理的数据所在的结点的CPU空闲下来,然后调度处理该批数据的Task到该结点上,这样能更好地匹配任务的本地性;②不进行等待,直接将处理该批数据的Task调度到其他结点上执行,当然此时该批数据需要移动到相应的结点上,这是退而求其次,运行一个本地性条件稍差一点的任务。
Spark所遵循的原则是,调度作业时优先调度到有最好本地性的结点上去执行Task,当较高的数据本地性级别不能满足时,Spark退而求其次调度作业到次好的本地性结点。具体实施时,Spark混合了这两种方案,即会首先等待一小会儿以期望忙碌的CPU空闲下来,如果这个等待的“一小会儿”到了而CPU仍然忙碌,就会移动数据到有空闲CPU的结点去执行。这个需要等待的“一小会儿”是可以通过参数spark.locality.wait、spark.locality. wait.process、spark.locality.wait.node和spark.locality.wait.rack来配置的,这几个参数一起决定了Spark任务调度模块在得到待分配任务时,如果没有更好的本地性级别,暂时不分配任务而是等待一小会儿以期望获得更好的本地性级别的等待时间。
·spark.locality.wait:调度作业时,如果没有更好的本地性级别,退而求其次调度到次好的本地性级别的等待时间,默认是3s。该时间是所有本地性级别的等待时间(process-local、node-local、rack-local和any)的默认值。当然也可以分别配置各个本地性级别的等待时间。
·spark.locality.wait.process:PROCESS_LOCAL本地性级别的等待时间,即尝试访问缓存在Executor进程的内存中数据的等待时间,默认值是spark.locality.wait的值。
·spark.locality.wait.node:NODE_LOCAL本地性级别的等待时间,默认值是spark.lo-cality.wait的值。比如,可以将该值设置为0,以跳过NODE_LOCAL本地性级别,而直接尝试RACK_LOCAL。
·spark.locality.wait.rack:RACK_LOCAL本地性级别的等待时间,默认值是spark.locality. wait的值。
一般来说,这几个参数默认的配置均不需要修改,但如果Task执行时间长且本地性级别差(可以通过WebUI观察到),可以调高这些参数,使系统等待更长的时间,以满足更好的本地性来运行作业。
需要指出的是,在应用程序刚启动,处理提交的第一批任务时,由于当作业调度模块开始工作时,具体处理任务的Executor可能还没有完全注册完毕,因此一部分任务会被错误地放置到No Prefer的队列中,由于这部分任务的优先级仅次于数据本地性满足Process级别的任务,它们就有可能被优先分配到非本地结点上执行(在的确没有Executors在对应的结点上运行,或者的确是No Prefer的任务(如shuffleRDD)的情况下,将任务放置到No Prefer的队列中确实是比较优化的选择,但是在这种情况下,由于只是这部分Executor还没来得及注册上而已,这种方法不是一个好选择)。这种情况下,即使调大这几个参数的数值也没有帮助。针对这种情况,Spark有一些已经完成的和正在进行中的release patch(修复方案),试图通过动态调整No Prefer队列、监控executors注册比例等方式来给出更加智能的解决方案。然而,也可以根据自身集群的启动情况,通过在创建SparkContext之后,主动Sleep几秒的方式来简单地解决这个问题。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。