首页 理论教育 深度解析RDD弹性特性的七个方面

深度解析RDD弹性特性的七个方面

时间:2023-06-29 理论教育 版权反馈
【摘要】:RDD作为弹性分布式数据集,它的弹性具体体现在以下7个方面。但是RDD的读操作既可以是粗粒度的也可以是细粒度的。此时会调用RDD.iterator(),RDD的iterator()的源代码如下。

深度解析RDD弹性特性的七个方面

RDD作为弹性分布式数据集,它的弹性具体体现在以下7个方面。

1.自动进行内存和磁盘数据存储的切换

Spark会优先把数据放到内存中,如果内存实在放不下,则会放到磁盘里面,不但能计算内存放下的数据,也能计算内存放不下的数据。如果实际数据大于内存,则要考虑数据放置策略和优化算法。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保障其高效运行。

2.基于Lineage(血统)的高效容错机制

Lineage是基于Spark RDD的依赖关系来完成的(依赖分为窄依赖和宽依赖两种形态),这种前后的依赖关系可恢复失去的数据。每个操作只关联其父操作,各个分片的数据之间互不影响,出现错误时只需恢复单个Split的特定部分即可。常规容错有两种方式:一个是数据检查点,另一个是记录数据的更新。数据检查点的基本工作方式就是通过数据中心的网络链接不同的机器,然后每次操作时都要复制数据集,就相当于每次都有一个拷贝,拷贝是要通过网络的,网络带宽就是分布式的瓶颈,对存储资源也是很大的消耗。记录数据更新就是每次数据变化了就记录一下,这种方式不需要重新复制一份数据,但是比较复杂,会消耗性能。Spark的RDD通过记录数据更新的方式很高效,原因为:一是RDD是不可变的且是Lazy级别;二是RDD的写操作是粗粒度的。但是RDD的读操作既可以是粗粒度的也可以是细粒度的。

3.Task如果失败会自动进行特定次数的重试

默认重试次数为4次。源代码如下。

TaskSchedulerImpl是底层的任务调度接口TaskScheduler的实现,这些Schedulers从每一个Stage中的DAGScheduler中获取TaskSet,运行它们。DAGScheduler是高层调度,它计算每个Job的Stages的DAG,然后提交给Stages,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。

4.Stage如果失败会自动进特定次数的重试

这样Stage对象可以跟踪多个StageInfo(存储SparkListeners监听到的Stage信息,将Stage信息传递给Listeners或web UI。默认重试次数为4次,且可以直接运行计算失败的阶段,只计算失败的数据分片,Stage源代码如下。

Stage是Spark Job运行时具有相同逻辑功能且并行计算任务的一个基本单元。Stage中所有的任务都依赖同样的Shuffle,每个DAG任务通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler运行这些阶段的拓扑顺序。每一个Stage都可能是ShuffleMapStage,如果是ShuffleMapStage则跟踪每个输出结点(nodes)上的输出文件分区,它的任务结果是输入其他的Stage,或者输入一个ResultStage,若输入一个Result-Stage,这个ResultStage的任务直接在这个RDD上运行计算这个Spark Action的函数(如count()、save()等),并生成shuffleDep等字段描述Stage和生成变量(如outputLocs和numAvailableOutputs),为跟踪map输出做准备。每一个Stage都会有firstjobid,确定第一个提交Stage的Job,当使用FIFO调度时,会使得其前面的Job先行计算或快速恢复(失败时)。

在此需要对ShuffleMapStage、ResultStage和SparkListener等做出解释。ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,它发生在每次Shuffle操作之前,可能包含多个pipelined操作,ResultStage阶段捕获函数在RDD的分区上进行Action算子计算结果,有些Stages并不是运行在RDD的所有分区上,如first()、lookup()等。SparkListener是Spark调度器的事件监听接口,注意这个接口随着Spark的版本的不同会跟随着变化。

5.Checkpoint和Persist(检查点,持久化),可主动或被动触发(www.xing528.com)

Checkpoint是对RDD进行的标记,会产生一系列的文件,且所有父依赖都会被Remove,是整个依赖(Lineage)的终点。Checkpoint也是Lazy级别的。persist后RDD工作时,每一个工作结点都会把计算的分片结果保存在内存或磁盘中,下一次如果要对相同的RDD进行其他的Action计算,就可以重用。

因为用户只与Driver Program交互,因此只能用RDD中的cache()方法去cache用户能看到的RDD。所谓能看到,是指经过Transformation算子处理后生成的RDD,而某些在Transformation算子中Spark自己生成的RDD是不能被用户直接cache的,如reduceByKey()中生成的ShuffleRDD、MapPartitionsRDD是不能被用户直接cache的。在Driver Program中设定RDD.cache()后,系统如何对进行cache?首先在计算RDD的Partition之前就去判断Par-tition要不要被cache,如果要被cache的话,先将Partition计算出来,然后cache到内存。Cache只使用memory,写到磁盘的话就checkpoint。调用RDD.cache()后,RDD就变成per-sistRDD了,其StorageLevel为MEMORY_ONLY,persistRDD会告知Driver说自己是需要被persist的。此时会调用RDD.iterator(),RDD的iterator()的源代码如下。

当RDD.iterator()被调用时,也就是要计算该RDD中的某个Partition时,会先去cache-Manager中获取一个blockId,然后去BlockManager中匹配该Partition是否被checkpoint,如果是就不用计算该Partiton了,直接从checkpoint中读取该Partition的所有records放入Array-Buffer里面;如果没有被checkpoint过,则先将Partiton计算出来,然后将其所有的records放到cache中。总体来说当RDD会被重复使用(不能太大)时,RDD需要cache。Spark自动监控每个结点缓存的使用情况,利用最近最少使用的原则来删除老旧的数据。如果想手动删除RDD,可以使用RDD.unpersist()方法。

此外,可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在结点间复制结合或者存储集合到Tachyon中。可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用默认的存储级别-StorageLevel.MEMORY_ONLY。RDD根据useDisk、use-Memory、useOffHeap、deserialized和replication这5个参数的组合提供了常用的12种存储级别,完整的存储级别介绍如下。

在此需要对StorageLevel做出相应的解释。StorageLevel是控制存储RDD的标志,每个StorageLevel记录RDD是否使用Memory,或使用ExternalBlockStore存储,如果RDD脱离了Memory或ExternalBlockStore,是否drop掉RDD,是否保留数据在内存中的序列化格式,以及是否复制多个结点的RDD分区。另外,org.apache.spark.storage.StorageLevel是单实例(singleton)对象,包含了一些静态常量和常用的存储级别,且可用单实例对象工厂方法(StorageLevel(…))创建定制化的存储级别。

Spark的多个存储级别意味着在内存利用率和CPU利用率间的不同权衡。推荐通过下面的过程选择一个合适的存储级别:①如果RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是CPU利用率最高的选项,会使RDD上的操作尽可能地快。②如果不适合用默认级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库来提高对象的空间使用率,但是仍能够相当快地访问。③除非算子计算RDD花费较大或者需要过滤大量的数据,否则不要将RDD存储到磁盘上,否则重复计算一个分区就会和从磁盘上读取数据一样慢。④如果希望更快地恢复错误,可以利用replicated存储级别,所有的存储级别都可以通过replicated计算丢失的数据来支持完整的容错,另外replicated的数据能在RDD上继续运行任务,而无须重复计算丢失的数据。在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP将对象从堆中脱离出来序列化,然后存储在一大块内存中,这就像它存储在磁盘上一样,但它仍在RAM中。对象在这种状态下不能直接使用,它们必须首先反序列化,也不受垃圾收集机制影响。OFF_HEAP具有如下优势:OFF_HEAP运行多个执行者共享的Tachyon中相同的内存池;OPP_HEAP显著地减少来自GC回收的划分;如果单个的Executor崩溃,缓存的数据不会丢失。

6.数据调度弹性(DAGScheduler、TASKScheduler和资源管理无关)

Spark将执行模型抽象为通用的有向无环图计划(DAG),这可以将多Stage的任务串联或并行执行,从而无须将Stage的中间结果输出到HDFS中,当发生运行结点故障时可有其他可用结点代替该故障结点运行。

7.数据分片的高度弹性(coalesce)

Spark进行数据分片时,默认将数据放在内存中,如果内存放不下,一部分会放在磁盘上进行保存。coalesce的源代码如下。

例如,在计算的过程中会产生很多的数据碎片,此时产生一个Partition可能会非常小,如果一个Partition非常小,每次都会消耗一个线程去处理的话,可能会降低它的处理效率,这时候需要考虑把许多小的Partition合并成一个较大的Partition去处理,这样会提升效率。另一方面,有可能内存不是那么多,而每个Partition的数据Block比较大,这时候需要考虑把Partition变成更小的数据分片,这样会让Spark处理更多的批次但是不会出现OOM异常(内存溢出)。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈