Spark的缓存通过RDD的cache方法实现,Cache方法是Persist持久化时Storagelevel参数默认设置为MEMORY_ONLY。在调用Cache方法的时候,Cache方法先执行不带参数的Persist方法,而无参数的Persist方法在实现时会调用默认参数为MEMORY_ONLY的Persist方法,在这个方法中,首先设置RDD的Storagelevel,然后在sparkContext中进行标记,而且Storagelevel只能被设置一次,如果Storagelevel从None修改为一个值以后,就不能再修改。在RDD的核心计算函数迭代器函数中,当Task运行的时候会调用RDD的Compute方法进行计算,而Compute方法会调用Iterator方法,如果StorageLevel不是NONE,那RDD就通过CacheManager的getOrCompute来调用获取缓存数据或者重新计算。
对于RDD的缓存触发,本节通过一个基于spark-shell环境下的WordCount实例进行说明。实例代码(无缓存情况)如图2-7所示,运行结果如图2-8所示,整个任务运行时间超过40s,亦即如果整个任务的计算数据产生丢失,再次计算需要花费40s的时间,而基于内存的数据的丢失十分常见,对此,Spark开发了自有的持久化机制Persist,而默认的Per-sist机制就是Cache,也就是通过缓存实现的。
图2-7 无缓存的WordCount代码
图2-8 无缓存代码执行耗时
之后通过对代码进行改进,在计算完成后先进行缓存,然后对缓存结果进行收集和显示,可以发现,从缓存中收集数据并显示花费的时间为1.8s,从容错角度讲,1.8s的时间对比之前重新计算需要40s的时间来说,已经大大降低了计算资源的浪费。(www.xing528.com)
图2-9~图2-11所示为对代码进行改进,使之先进行缓存后再进行计算,并显示缓存后的计算时间等信息。
图2-9 对WordCount结果进行缓存
图2-10 输出缓存后的结果
图2-11 缓存获取计算数据耗时
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。