首页 理论教育 提高性能:Spark广播大变量的优化

提高性能:Spark广播大变量的优化

时间:2023-06-29 理论教育 版权反馈
【摘要】:大量的变量副本在网络中传输的性能开销,以及在各个结点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。这显然并不高效,Spark为此推出了广播变量。Spark的广播变量只会发送给Executor结点一次,且发送后广播变量会以非序列化的形式保存在Executor内存中,而不会为每个Task都发送一份。另外一个使用广播变量的常用场景是,当对两个表进行Join操作时,为了避免Shuffle,经常将较小的表广播到Executor上。

提高性能:Spark广播大变量的优化

有时在开发过程中,会遇到需要在算子函数中使用外部大变量的场景(如100 MB的大集合),那么此时就可以考虑使用Spark的广播(Broadcast)功能来提升性能。

Spark使用了Shared-Nothing架构,数据以分区的形式散落在各个结点上,每个结点都有自己的CPU、内存和存储资源。tasks并没有共享的全局内存区域,Driver和Task通过通信来共享数据。比如,当一个RDD算子所使用的函数中引用了一个来自Driver的变量时,Spark会将该变量的一个副本随一个Task一起发送给Executors,然后每个Task得到一个该变量的副本,并以只读的形式访问它,任何对该变量的修改都是本地的,并不会返回给Driver。这个发送动作在每个Stage的开始都会发生一次。

这种默认的行为在Driver和Task共享较大的变量(如静态查询表),且Job有多个Stage时并不高效,比如该静态查询表的大小是100 MB且该Job有10个Stage,则Spark会将这100 MB的数据发送给每个Executor共10次。大量的变量副本在网络中传输的性能开销,以及在各个结点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。这显然并不高效,Spark为此推出了广播变量。

Spark的广播变量只会发送给Executor结点一次,且发送后广播变量会以非序列化的形式保存在Executor内存中,而不会为每个Task都发送一份。Spark还会使用高效的广播算法(Http或Torrent方式)来分发变量,所以网络通信的开销并不大。熟悉源代码的朋友都知道,在Spark的HadoopRDD中,就采用了广播来进行Hadoop的JobConf的传输以提高效率。广播后的变量会保证每个Executor的内存中只驻留一份该变量的副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

需要说明的是,由于每个Stage的Task公用的数据在Stage开始时都会向每个Executor结点发送一次,发送后这些公用数据会以序列化的形式缓存在Executor中,然后各个任务在运行时反序列化这些公用的数据,以获得一个只读的副本供自己使用,这就意味着显式创建广播变量的模式,只有在Job有多个Stage,且多个Stage的Task需要访问来自Driver的相同数据时,或者需要以非序列化的形式缓存数据时,才真正有意义。(www.xing528.com)

事实上,Spark会在Master上打印出每个任务序列化后的大小,通常来讲,大于20 KB的任务就可以考虑是否可以通过广播机制进行优化

可以通过在一个只读变量v上调用SparkContext.broadcast(v)来创建广播变量,广播变量本质上是围绕着变量v的封装,广播后可以通过value方法访问这个广播变量的值。在创建了广播变量之后,在集群上的所有函数中都应该使用它来替代使用v,这样v就不会不止一次地在结点之间传输了。另外,为了确保所有的结点获得相同的变量,变量v在被广播之后就不应该再修改。

实例代码如下。

另外一个使用广播变量的常用场景是,当对两个表进行Join操作时,为了避免Shuffle,经常将较小的表广播到Executor上。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈