尽管RDD中的Lineage信息可以用来故障恢复,但对于那些Lineage链较长的RDD来说,这种恢复可能很耗时,尤其是在做一些迭代计算的时候。一般来说,Lineage链较长、宽依赖的RDD需要采用Checkpoint机制。这种情况下,集群的结点故障可能导致每个父RDD的数据块丢失,因此需要全部重新计算。将窄依赖的RDD数据存到物理存储中可以实现优化。
分布式数据集实现容错的方式有两种,一种是记录更新,另一种是检查点(check-point)。之前介绍的血统机制就是通过相当于粗粒度的记录更新操作来实现容错的,而RDD的Checkpoint机制相当于通过冗余数据来缓存数据,Checkpoint机制没有在Job第一次计算得到结果就存储,而是等到Job结束后另外启动专门的Job去完成Checkpoint。也就是说,需要Checkpoint的RDD会被计算两次。因此,在使用rdd.checkpoint()时,建议加上rdd.cache(),这样第二次运行的Job就不用再去计算该RDD了,直接读取缓存上的数据写入磁盘,其实Spark提供了rdd.persist(StorageLevel.DISK_ONLY)这样的方法,相当于缓存数据到磁盘上,这样就可以做到RDD第一次被计算得到时就存储到磁盘上。当一个RDD需要被Checkpoint时,一般都会经过initialized、Marked for checkpointing、checkpointing in progrss、Checkpointed这几个阶段后才能被Checkpoint。
下面我们通过源代码来解析Checkpoint四个比较大的步骤:
(1)当RDD需要Checkpoint时,则会调用rdd.checkpoint()来进行标识。
1)设定哪些RDD需要被Checkpoint,然后该RDD会交给RDDCheckpointData进行管理。
2)我们进入RDDCheckpoint类查看,当调用markForCheckpoint方法后,RDDCheck-pointData会将RDD标记为MarkedForCheckpoint。
3)同时用户还要设定Checkpoint的存储路径,一般在HDFS上。
4)对于RDD的标识,需要在Job执行前被mark(标识),并且最好选择persist(固化)这个RDD,这样Checkpoint可以直接从缓存读取数据,否则在存Checkpoint文件时需要重新Compute(计算)该RDD内容。
5)当RDD被Checkpoint后,所有的dependencis(也即该RDD的所有依赖)都会被清除。因为既然RDD已经被Checkpoint,那么就可以直接从文件读取,没有必要保留之前par-ents的dependencies了。
在RDDCheckpointData的doCheckpoint方法里,最终会调用下面的这个同步代码块。
6)继续调用rdd的markCheckpoint方法,完成dependencis的清除。(www.xing528.com)
(2)在SparkContext.runJob中,最后会调用rdd.doCheckpoint,如果前面已经标识过了要进行Checkpoint,那么这里就会将RDD真正Chenckpoint到文件中去。
1)在SparkContext的runJob方法中,每个Job运行结束后都会调用finalRDD.doCheckpoint()。
2)finalRDD会顺着computing chain(也即从初始RDD到最后的finalRDD的整个计算链)回溯扫描,碰到要Checkpoint的RDD就将其标记为CheckpointingInProgresst。
3)然后将写磁盘(比如写HDFS)需要配置的文件(如core-site.xml等)broadcast到其他Worker结点上的BlockManager。完成后,启动一个Job来完成Checkpoint。具体实现是在RDDCheckpointData的doCheckpoint方法中。
(3)在RDDCheckpointData.doCheckpoint中会调用rdd.markCheckpointed(newRDD),清除dependencies信息并最终将状态设置为Checkpointed,以表示完成Checkpoint。在这个过程中会生成一个新的CheckpointRDD,该CheckpointRDD负责以后读取在文件系统上的Chek-point文件,作为读取checkpoint文件生成的RDD的父RDD。
(4)在rdd.computeOrReadCheckpoint方法中,如果看到已经完成Checkpoint,就会直接从firstParent中读取数据。
rdd.firstParent的定义如下:
dependencies.head.rdd,asInstanceOf[RDD[U]],就是从依赖中取第一个dependency的RDD。
通过以上对Spark的Checkpoint机制的源码分析,使得我们对整个Checkpoint的运行机制的了解更深入了,这样有利于我们在生产环境下更有效率地使用Checkpoint机制。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。