序列化有时是shuffle和cache的瓶颈,合理地设置序列化,不但能提高I/O性能(包括网络I/O和磁盘I/O),还能减少内存的使用。
Spark默认的序列化器是org.apache.spark.serializer.JavaSerializer,也就是使用ObjectOut-putStream/ObjectInputStream API来进行序列化和反序列化,但是这个默认的序列化器的性能和空间表现都比较差。Spark同时支持使用Kryo序列化器org.apache.spark.serializer.KryoSerializer,该序列化器更快,压缩率也更高。官方介绍,Kryo序列化机制比Java序列化机制性能高了10倍左右,当然放到整个Spark程序中来考量,比重就没有那么大了,但是以WordCount为例,通常也很容易达到30%以上的性能提升。
Spark之所以没有默认使用Kryo作为序列化器,是因为Kryo并不支持所有可序列化的类型,且要求最好注册所有需要进行序列化的自定义类型,这对于开发者而言略显麻烦。推荐在所有网络I/O密集型应用中使用Kryo。事实上,Spark对大多数常用的scala类都自动包含了Kryo序列化库。
在Spark中,以下地方会涉及序列化:在算子的函数中使用到外部变量时,该变量会被序列化后通过网络传输到task中;使用自定义的类型作为RDD的泛型类型时,所有的自定义类型对象都会进行序列化;使用需序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个Partition都序列化成一个大的字节数组;Spark Task需要被序列化后从Driver发送到Executor上。这里涉及序列化的4个地方,前面3个使用的序列化器都可以通过设置spark.serializer来使用Kryo序列化,以提高性能;而Spark Task的序列化是通过spark.closure.serializer来配置的,但是目前它只支持JavaSerializer。
使用Kryo序列化器时,只要设置序列化器为Kryo,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等)。事实上,如果没有注册自定义类型到Kryo,Kryo仍然可以工作,但需要对每个该类型的对象都存储完整的类名,当有百万条甚至更多的序列化记录时,这会额外占用更多的空间,所以推荐注册自定义的类到Kryo,也可以通过设置spark.kryo.registrationRequired参数为true来强制注册,这样当Kryo遇到没有注册的类时,会抛出错误。(www.xing528.com)
以下是使用Kryo序列化器的代码示例。
在程序运行过程中,如果遇到了错误NotSerializableException,就说明程序代码中使用了没有实现Java的Serializable序列化接口的类,需要修改相应的类,使其实现该接口。当由于程序中使用了很多类而不好判断究竟是哪个类引起该问题时,可以通过在Spark-Submit提交程序时,在-driver-java-options和-executor-java-options中指定-Dsun.io.serialization. extended DebugInfo=true来协助判断究竟是哪个类引起的该问题。
压缩和解压缩会消耗CPU,但由于它能减少数据体积,使存储和传输更高效,所以在大数据分布式计算框架下很有用。Spark中压缩相关参数为:①spark.rdd.compress,该参数决定了RDD数据在序列化之后是否进一步进行压缩,然后再储存到内存或磁盘上,这个值默认是不压缩,但是如果在磁盘I/O的确成为问题或者GC问题真的没有其他更好的解决办法时,可以考虑启用RDD压缩;②spark.broadcast.compress,该参数决定了是否对Broadcast的数据进行压缩,默认值为True,因为Broadcast的数据需要通过网络发送,而在Executor端又需要存储在本地BlockMananger中,所以通过压缩从而减小体积来减少网络传输开销和内存占用,通常都是有利于提高程序整体性能的;③spark.io.compression.codec,该参数决定了用来压缩内部数据,比如RDD分区、广播变量和Shuffle输出的数据等,所采用的压缩器,有3种选择:lz4、lzf和snappy,默认的是Snappy,但和Snappy相比较,lzf的压缩率比较高,故在有大量Shuffle的情况下,使用lzf可以提高Shuffle性能,进而提高程序整体效率。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。