首页 理论教育 深入剖析Spark的广播变量机制

深入剖析Spark的广播变量机制

时间:2023-06-20 理论教育 版权反馈
【摘要】:广播变量允许用户保留一个只读的变量,缓存在每一台机器上,而不用在任务之间传递变量。同时Spark会尝试使用一种高效的广播算法来传播广播变量,从而减少通信的开销。另外,广播变量的实例对象的value值不能在广播后修改,这样可以保证所有结点收到的都是一模一样的广播值。下面我们结合Spark源代码深入分析一下广播变量的创建。通过调用bc.value来取得广播变量的值,其主要实现在TorrentBroadcast类的反序列化方法readBroadcastBlock()中。

深入剖析Spark的广播变量机制

广播变量允许用户保留一个只读的变量,缓存在每一台机器上,而不用在任务(Task)之间传递变量。广播变量可被用于有效地给每个结点一个大输入数据集的副本而非每个任务保存一份复制。同时Spark会尝试使用一种高效的广播算法来传播广播变量,从而减少通信的开销。

广播变量是通过调用SparkContext.broadcast(value)方法创建的。广播变量是一个val-ue的封装器,它的值可以通过调用Broadcast实例对象的value()方法获得。在Spark中创建广播变量的broadcast()方法的具体实现内容如下:

在广播变量的实例对象被创建后,它应该在集群运行的任何函数中,代替value值被调用,从而value值不需要被再次传递到这结点上。另外,广播变量的实例对象的value值不能在广播后修改,这样可以保证所有结点收到的都是一模一样的广播值。

Broadcast(广播变量)是相对较为常用方法功能,通常使用方式包括共享配置文件、map数据集、树形数据结构等,为能够更好更快速为TASK任务使用相关变量。下面我们结合Spark中广播变量的源代码从三个方面解析Broadcast:初始化、创建(写入)、使用(读取)。

1.广播变量初始化

(1)SparkContext在初始化时会创建SparkEnv对象env,在创建env对象的过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:

(2)构造BroadcastManager对象时会调用它自己的initialize()方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。在BroadcastManager的initialize()方法里,会根据“spark.broadcast.factory”的配置属性利用反射技术得到BroadcastFactory实例,这里默认的配置是org.apache.spark.broadcast.TorrentBroadcastFactory。

(3)TorrentBroadcastFactory这个工厂类在初始化的过程中也会调用自己的initialize()方法。当然这个initalize()方法什么也没做。这也可以看出和HttpBroadcastFactory工厂类的区别,Torrent协议的传输的处理方式就是P2P方式(Peer to Peer,对等网络),去中心化地传输数据。而Http协议的传输是中心化服务,需要启动服务来接受请求。下面代码是Torrent-BroadcastFactory的initialize()方法的实现:

(4)接下来我们分析如何在调用SparkContext中的broadcast()方法中来初始化一个广播变量。先看broadcast()方法的源码实现:

(5)在上述源代码中,env.broadcastManager这行代码会得到一个BroadcastManager对象,在BroadcastManager实例对象的初始化过程中生成一个TorrentBroadcastFactory实例对象,接下去最终会调用TorrentBroadcastFactory的newBroadCast()方法,创建TorrentBroad-cast。

(6)在TorrentBroadcast初始化的过程中,会直接调用setConf()方法将SparkConf对象注入TorrentBroadcast中,同时定义压缩方式。并调用writeBlocks()方法将数据切分存储。

2.创建(写入)

Broadcast实例对象创建时,使用SparkContext的broadcast()方法,并将值一直传递至TorrentBroadcast,并构建TorrentBroadcast对象,同时完成将数据信息交给BlockManager进行注册,并序列化在本地存储。下面我们结合Spark源代码深入分析一下广播变量的创建。(www.xing528.com)

(1)TorrentBroadcast对象的writeBlocks()方法主要功能便是按照定义的广播块大小切分数据(默认是4 MB,spark.broadcast.blockSize),将切分后的数据信息存放在Driver端的BlockManager中,并通知BlockManageMaster完成注册,最后把数据写入本地磁盘中。

(2)在调用BlockManager对象的putBytes()方法时,广播变量依据存储策略优先写入本地,具体实现可以在BlockManager类的putBytes()方法体中的doPut方法里查找。

3.使用(读取)

广播变量依据存储策略优先写入本地(BlockManage#putBytes方法),既然序列化数据是本地存储,由此而来的问题是读取问题,BlockManage存储数据并不似HDFS会依据备份策略存储多份数据放置不同结点,如没有备份数据,那么必然产生数个问题:

1)结点故障,无法访问结点数据。

2)数据热点,所有任务皆使用该数据。

3)网络传输,所有结点频繁访问单结点。

解决这些问题,Spark并没有使用HDFS的思想,而选择是P2P点对点方式(BT下载)解决问题,只要使用过广播变量数据,则在本结点存储数据,由此变成新的数据源,随着数据源不断增加传输数据的速度也会越来越快,刚开始传输则相对会慢一些。同时,不建议使用大文件进行广播变量,当广播变量较大或者使用较频繁时,相当于每个结点都要存储一份,形成网状传输方式交换数据,因此建议存储配置文件或某种数据结构为上佳选择。

(1)通过调用bc.value(bc指的是广播变量实例对象)来取得广播变量的值,其主要实现在TorrentBroadcast类的反序列化方法readBroadcastBlock()中。

(2)在readBlocks()方法中,它会先调用getLocal()方法从本地查找数据,如果本地不存在该数据,继而会从远程查找,然后把数据存储到本地。ReadBlocks()方法的具体实现代码如下:

到现在我们知道,TorrentBroadCast(TorrentBroadCast是BroadCast的一个子类)首先将广播变量数据分块,并存到BlockManager中;每个结点需要读取广播变量时,是分块读取,对每一块都读取其位置信息,然后随机选一个存有此块数据的结点进行读取;每个结点读取后会将包含的块信息报告给BlockManagerMaster,这样本地结点也成为了这个广播网络中的一个peer(平级结点)。与HttpBroadCast方式形成鲜明对比,这是一个去中心化的网络,只需要保持一个tracker(追踪者)即可,这就是P2P的思想。至此,广播变量的初始化、数据写入和数据读取三个重要概念我们已经探讨完毕。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈