这部分内容是在上一节的基础上,对某些案例进行扩展并深入分析。其中,部分案例源自官网,在此对其进行深入解析。
一、DataFrame与RDD间的交互
DataFrame可以从结构化文件、hive表、外部数据库以及现有的RDD加载构建得到。具体的结构化文件、hive表、外部数据库的相关加载示例可以参考章节3.3 Spark SQL处理各种数据源的案例与解析部分。这里主要针对从现有的RDD来构建DataFrame进行实践与解析。
Spark SQL支持两种方法将存在的RDD转换为DataFrame。
1)第一种方法是使用反射来推断包含特定对象类型的RDD的模式。在写Spark程序的同时,已经知道了模式,这种基于反射的方法可以使代码更简洁并且程序工作得更好。
2)第二种方法是通过一个编程接口来实现,这个接口允许构造一个模式,然后在存在的RDD上使用它。虽然这种方法代码较为冗长,但是它允许在运行期之前不知道列以及列的类型的情况下构造DataFrame。
(一)通过反射机制构建DataFrame
利用反射推断模式,Spark SQL的Scala接口支持将包含样本类的RDD自动转换为Dat-aFrame。这个样本类定义了表的模式。样本类的参数名字通过反射来读取,然后作为列的名字。样本类可以嵌套或者包含复杂的类型如序列或者数组。这个RDD可以隐式转化为一个DataFrame,然后注册为一个表,表可以在后续的SQL语句中使用。
以people.txt作为测试数据,使用Scala语言来创建DataFrame。
1.首先,查看people.txt数据
使用Hadoop的hdfs dfs-cat命令查看people.txt文件内容。
注意:这里已经将HADOOP_HOME/bin添加到环境变量PATH中,因此直接使用hdfs命令。
2.定义people.xml对应的样本类People,并加载文件,构建DataFrame
示例中:
1)首先定义样本类People。
2)之后通过SparkContext的textFile方法将文件加载进来,用“,”作为分隔符,将每一行数据分割为包含两个元素(“name”和“age”)的数组,继续使用map方法将数组映射成样本类People,此时可以得到对应该文件的RDD实例,其元素类型为样本类People。
3)最后在RDD实例上使用toDF方法,转换为对应的DataFrame实例people。
注意:在以spark-submit方式提交的应用程序中,不要将样本类People定义在和SparkContext实例相同的作用域中,即需要将样本类定义在main方法外或object外。
用Scala函数范式修改以上代码:
4)将DataFrame注册成临时表,并对表执行SQL查询语句。(www.xing528.com)
调用people的registerTempTable方法,注册为临时表“people”,注册后就可以通过SQL方法使用sqlContext支持的SQL语句对该临时表进行操作。示例中使用“SELECT”语句从表“people”中查找出年龄在13到19之间的人员的名字。
SQL查询语句得到的结果类型是DataFrame实例,支持所有普通RDD的全部操作。示例中最后一行代码将DataFrame的每一条记录用map方法转换为字符串,字符串中提取了“name”列,然后用collect方法收集记录并在界面打印出来。可以单步执行这一行代码,查看每个API操作的返回类型。
(二)用编程指定模式构建DataFrame
当样本类不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个DataFrame里实例可以通过下面三个步骤来创建。
1)从原来的RDD创建一个元素类型为行(Row)的RDD。
2)创建由一个StructType表示的模式(Schema),与第一步创建的RDD的Row的结构相匹配。
3)在元素类型为Row的RDD上,通过applySchema方法应用第二步构建的Schema。
Scala语言创建DataFrame的方式如下:
1.加载文件,并构建文件对应的Schema的列名字符串
2.导入org.apache.spark.sql.types._,构建Schema信息
注意:官网示例中的import语句有问题,StructType等类型在org.apache.spark.sql.types中。
3.将RDD的字符串元素转换为DataFrame的Row类型
4.应用之前定义的Schema,创建DataFrame
5.注册到临时表中,并通过SQL查询语句从该临时表中构建出新的DataFrame实例
6.将新构建的results的内容输出到控制台
(三)DataFrame与RDD间的关系
构建一个DataFrame,然后调用rdd方法,用toDebugString查看rdd的Lineage关系。
从MapPartitionsRDD[10]at map at DataFrame.scala:889[]这一行,可以看到从Dat-aFrame转换得到RDD时(即这里的people.rdd调用),实际上内部是调用了DataFrame的map方法,将DataFrame的Row转换为RDD的元素。这可以参考前面构建DataFrame的三个步骤。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。