1.Checkpoint的创建
Spark关于Checkpoint的源代码位于RDD.Scala文件中的Checkpoint方法,具体内容如下所示。从中可以看出在创建Checkpoint时,Spark会先判断是否有CheckpointDir的地址(存放Checkpoint数据内容文件的地址),若不存在则报错;若存在再检测已有的Checkpoint信息,看是否存在Checkpoint,如果没有,则创建CheckpointData,其中包含了Checkpoint的信息。
在Spark中,某RDD进行Checkpoint操作后会将此RDD的依赖关系清空,此项变化在算法计算中具有极大的意义,假设一个运算步骤有100步(同一个Stage中),Spark框架会在计算时将这100步的函数合并为一个大函数,然后进行计算,而如果在第90步处进行了Checkpoint,则Spark计算时将前90步函数合并为一个大的函数串进行计算,并在计算完成时进行保存,之后10步的函数合并为一个大的函数串进行计算。如果出现错误,只需要对后10步进行计算,具体源代码如下所示:第10行中便是RDD进行Checkpoint时会调用rdd.markCheckpointed()方法,而此方法将RDD的dependency进行了清空操作。
2.Checkpoint的写入(www.xing528.com)
写入数据至Checkpoint文件的源代码位于ReliableCheckpointRDD.Scala的writeRDDTo-CheckpointDirectory()。如下面的源代码所示,分别完成对于checkpointDir目录下的文件的创建和具体内容的写入。
3.Checkpoint的触发
当需要对已进行了Checkpoint的数据进行处理时,框架默认运行docheckpoint方法(如下所示)。若当前RDD的checkpointData是被定义的(此RDD已被进行Checkpoint),则获得数据;若当前RDD的checkpointData是未被定义的,则按照依赖遍历RDD查询此RDD依赖RDD是否存在Checkpoint。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。