在Spark程序运行中,RDD的数据既可以由并行Scala集合转化而来(如通过parallelize方法输入Scala集合数据),也可以从外部文件存储系统获取(如通过textFile方法读取HDFS上的数据等)。
1.Scala集合转换输入
SparkContext提供了parallelize()和makeRDD()两个方法从Scala集合中生成RDD,不同的一点是makeRDD方法还提供了一个可以指定每一个分区preferredLocations参数的实现。
下面是在Spark交互式工具Spark Shell上的演示代码:
通过SparkContext的parallelize方法,把Scala集合转换成了ParallelCollectionRDD。
对于parallelize方法,还可以设置参数numSlices,该参数表示对数据集进行切片,每个分片启动一个Task进行处理。下面代码中parallelize方法中的2表示的切片为2。
通过makeRDD方法生成了一个ParalleCollectionRDD,在这个RDD的preferredLocations方法中可以通过指定每个分区值的最优存放位置。
在以上的代码中,我们指定了Range(1,2,3,4,5)的最优存放位置是List(host1,host2),由makeRDD方法生成的rdd2,它的partitions(0)指的就是Range(1,2,3,4,5),当调用rdd2的preferredLocations方法时得出Range(1,2,3,4,5)的最优存储位置是List(host1,host2)。
2.外部文件存储系统输入
Spark的生态系统与Hadoop是完全兼容的,所以也支持与Hadoop相关的文件类型或者数据库类型。另外,Spark为了能够兼容Hadoop新旧两个的版本,也提供了两套输入操作接口,这些接口主要包含以下四个参数:
a.输入格式(InputFormat):指定数据输入的类型,如TextInputFormat等。(www.xing528.com)
b.键类型:指定[K,V]键值对中的K的类型。
c.值类型:指定[K,V]键值对中的V的类型。
d.分区值:指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits。
(1)使用textFile方法可以将本地文件或HDFS文件转换成RDD,如果要读取本地文件,各个结点都要有该文件,或者使用网络共享文件。它支持整个文件目录读取,如text-File(“/my/directory”)。它也支持压缩文件读取,比如textFile(“/my/directory/*.gz”)。还支持通配文件读取,如textFile(“/my/directory/*.txt”)。对于textFile方法而言,只有path这个指定文件路径的参数,minPartittion参数在系统内部指定了默认值。
(2)hadoopFile方法可以从Hadoop相关的文件系统读取文件。
(3)newAPIHadoopFile方法是针对Hadoop的新API提供的读取输入数据的方法。
(4)对于有很多小文件需要处理的情况来说,Spark也提供了wholeTextFiles方法来支持。这个方法返回的结果是键值对。键是文件名,值是文件内容。
下面简单演示Spark用textFile方法从HDFS文件系统上读取数据,然后使用count方法统计一下该文件的行数。
可以看到,我们调用sc(SparkContext的实例)的textFile方法从HDFS文件系统加载了一个README.md文件,然后调用RDD(变量line)的count方法得出该文件一共有141行。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。