DataFrame提供了一套丰富的API,让Spark变得更加平易近人,使得大数据分析的开发越来越容易。DataFrame API将关系型的处理与过程型处理结合起来,可以对外部数据源(Hive、JSON等)和Spark内建的分布式集合(RDD)进行关系型操作。
DataFrame能处理的外部数据源,除了内置的Hive、JSON、Parquet、JDBC以外,还包括CSV、Avro、HBase等多种数据源,Spark SQL多元一体的结构化数据处理能力正在逐渐释放。DataFrame数据采用压缩的列式存储,对DataFrame的操作采用Catalyst——一种关系操作优化器(也称为查询优化器),因此效率更高。
本节读取电影票房收入的数据文件,生成名为film的DataFrame,对读DataFrame应用各种Spark算子进行计算。
首先准备数据。模拟生成电影票房收入的数据文件film.json、newFilm.json,从本地上传到Hdfs文件系统中,电影票房收入文件的格式包括6列:票房收入、制片地区、影片ID、语言代码、影片名、上映年份。电影票房收入的数据文件内容如下:
模拟生成语言代码文件language.json,同样从本地上传到Hdfs文件系统中,语言代码文件的格式包括2列:语言名称、语言代码。语言代码文件的数据内容如下:
接下来在Spark-Shell中使用sqlContext.jsonFile方法分别加载电影票房收入文件,语言代码文件。
接下来对常用的DataFrame API进行示例解析,相应的操作数据为电影票房收入的数据文件film.json和newFilm.json。
1.collect与collectAsList
1)定义。
def collect():Array[Row]
def collectAsList():List[Row]
2)功能描述。
●collect返回一个数组,包含DataFrame中包含的全部数据记录。
●collectAsList返回一个Java List,包含DataFrame中包含的全部数据记录。
3)示例。
4)解析。
上述示例中首先加载film,然后以表格的形式呈现其内容。
两个collect型的方法都可以获取df的全部数据记录,只是返回的类型不同。调用col- lect方法,返回的是数组,数组元素的类型为org.apache.spark.sql.Row;调用collectASList返回类型java.util.List。
2.count
1)定义。
def count():long
2)功能描述。
返回DataFrame的数据记录的条数。
3)示例。
scala>df.count
16/04/2714:33:45 INFOmapred.FileInputFormat:Total input paths to process:1
res20:Long=7
3.describe
1)定义。
def describe(cols:String∗):DataFrame
2)功能描述。
概要与描述性统计(Summary and Descriptive Statistics),包含计数、平均值、标准差、最大值和最小值运算。
3)示例。
4.First
1)定义。
def first():Row
2)功能描述。
返回DataFrame的第一行,等同于head()方法。
3)示例。
scala>df.first
res22:org.apache.spark.sql.Row=[33.9,CN,001,1,Mermaid,2016]
5.head
1)定义。
def head():Row
2)功能描述。
不带参数的head方法,返回DataFrame的第一条数据记录;指定参数n时,则返回前n条数据记录。
3)示例。
6.show
1)定义。
def show():Unit
def show(numRows:Int):Unit
2)功能描述。
●不带参数时,用表格的形式显示DataFrame的前20行记录。
●指定参数numRows时,用表格的形式显示DataFrame指定的行数记录。
3)示例。
7.take
1)定义。
def take(n:Int):Array[Row]
2)功能描述。
类似head方法,返回DataFrame中指定的前n行的值。
3)示例。
8.cache
1)定义。
def cache():DataFrame.this.type
2)功能描述。
将DataFrame缓存到内存中。
3)示例。
9.columns
1)定义。
def columns:Array[String]
2)功能描述。
以数组形式返回DataFrame的所有列名。
3)示例。
scala>df.columns
res29:Array[String]=Array(boxoffice,country,id,languageid,name,year)
10.dtypes
1)定义。
def dtypes:Array[(String,Stirng)]
2)功能描述。
以数组形式返回所有列名及其对应数据类型。
3)示例。
11.explain
1)定义。
def explain(extended:Boolean):Unit
2)功能描述。
这个方法用于调试目的。
●不带参数时,仅将DataFrame的物理计划打印到Web控制台上。
●当指定参数extended为true时,打印所有计划到Web控制台上,包括解析逻辑计划、分析逻辑计划、优化的逻辑计划和物理计划。
3)示例。
●不带参数时,仅将物理计划打印到Web控制台上。
●指定参数extended为true时,打印所有计划到Web控制台上。
12.printSchema
1)定义。
def printSchema():Unit
2)功能描述。
以树形结构将DataFrame的Schema信息打印到Web控制台上。
3)示例。
13.registerTempTable
1)定义。
def registerTempTable(tableName:String):Unit
2)功能描述。
将DataFrame注册为指定名字的临时表。
3)示例。
4)解析。
将DataFrame注册成临时表之后,可以使用SQLContext的sql方法,对其执行SQL语句。
14.schema
1)定义。
def schema:StructType
2)功能描述。
返回DataFrame的Schema信息,对应类型为StrutType。
3)示例。
15.toDF
1)定义。
def toDF():DataFrame
def toDF(colNames:String∗):DataFrame
2)功能描述。
●不带参数的toDF返回它本身。
●带字符串数组的参数时,返回新的DataFrame,该DataFrame重命名了各列名。
3)示例。
16.persist
1)定义。
2)功能描述。
以给定的存储等级将DataFrame持久化到内存或者磁盘中。unpersist则是将DataFrame标记为非持久化的。
●persist(newLevel:StorageLevel):设置RDD的存储级别,在其首次进行计算以后持久化值。如RDD没设置存储级别,此方法用于设置新的存储级别。本地检查点将会提示异常。
●persist():以默认的存储级别(MEMORY_ONLY)持久化RDD。
●unpersist():设置RDD为非持久化,其中unpersist的入参blocking默认设置为true,即阻塞直到所有块被删除。
●unpersist(blocking:Boolean):设置RDD为非持久化,清除RDD在内存和磁盘中的所有块。
3)示例。
17.agg
1)定义。
2)功能描述。
agg是Spark1.5.x开始提供的一类内置函数。agg这一系列的方法,为DataFrame提供数据列不需要经过group就可以执行统计操作。
3)示例。
下面的示例是统计year的最大值和boxoffice的平均值。
下面是使用Map作为参数的示例,分别统计year的最小值和boxoffice的平均值。
上面的代码与下面使用二元数组作为参数的示例相同。
18.apply
1)定义。
def apply(colName:String):Column
2)功能描述。
根据指定列名返回DataFrame的列,其类型为Column。
3)示例。
19.as
1)定义。
def as(alias:Symbol):DataFrame
2)功能描述。
调用as方法后,使用别名构建DataFrame。
3)示例。
首先,注册临时表,然后修改调试日志的级别,方便查看调试信息。
为了分析这个方法的作用,下面分别查看带as方法和不带as方法的两种情况。(www.xing528.com)
●不带as方法时的调试信息:
●带as方法时的调试信息:
4)解析。
通过上面两种情况的比较可以看出,仅在解析逻辑计划、分析逻辑计划中,使用了别名Subquery alise。
20.distinct
1)定义。
def distinct():DataFrame
2)功能描述。
返回对DataFrame的数据记录去重后的DataFrame。
3)示例。
4)解析。
在实例中,选择了有重复数据记录的“year”列,最后调用distinct方法进行去重。
21.except
1)定义。
def except(other:DataFrame):DataFrame
2)功能描述。
返回DataFrame,包含当前DataFrame的数据记录,同时这些Rows不在另一个Dat-aFrame中,相当于两个DataFrame做减法。
3)示例。
4)解析。
因为001Mermaid这部电影既在df中,又在newDf中,所以经过排除运算之后,001这条记录不再被显示。
22.filter
1)定义。
def filter(conditionExpr:String):DataFrame
def filter(condition:Column):DataFrame
2)功能描述。
按参数指定的SQL表达式的条件过滤DataFrame。
●filter(conditionExpr:String)根据给的的SQL表达式进行过滤;filter(condition:Column)根据给的条件进行过滤。例如过滤出年龄大于15岁的用户记录:
peopleDf.filter("age>15")
peopleDs.where($"age">15)
这两种写法是等价的。
3)示例。
23.groupBy
1)定义。
def groupBy(col1:String,cols:String∗):GroupedData
def groupBy(cols:Column∗):GroupedData
2)功能描述。
使用一个或多个指定的列对DataFrame进行分组,以便对它们执行聚合操作。
3)示例。
示例中先根据country列对df进行分组,分组后求年份的最大值和票房的平均值。
24.intersect
1)定义。
def intersect(other:DataFrame):DataFrame
2)功能描述。
取两个DataFrame中同时存在的数据记录,返回DataFrame。
3)示例。
在该示例中,因为001 Mermaid这部电影既在df中,又在newDf中,所以intersect的运算结果为Mermaid。
25.join
1)定义。
2)功能描述。
对两个DataFrame执行join操作。join根据传入参数的不同有多种实现方法。不带参数时取笛卡儿积,仅带join Exprs时默认为Inner Join,第三个join参数joinType可以指定具体的join操作,例如:′inner′、′left oute′r、′rightoute′r、′leftsemio′。
3)示例。
在本章的2.2.3节的第14示例中,已经介绍了如何对两个DataFrame实例进行join操作。下面再来做一个简单的join示例,将影片信息和语言信息做join关联操作,查询显示汉语、英语其他语种的电影影片信息。
26.limit
1)定义。
def limit(n:Int):DataFrame
2)功能描述。
返回DataFrame的前n条数据记录。
3)示例
27.orderBy和sort
1)定义。
2)功能描述。
对DataFrame按指定的一列或多列进行排序,分别支持字符串或Column的参数列表。
●sort(sortExprs:Column∗):根据给定的表达式返回一个新的DataFrame。例如:df.sort($"col1",$"col2".desc)
●sort(sortCol:String,sortCols:String∗):根据指定的列返回一个新的DataFrame,所有列升序排列。以下三种写法等价:
df.sort("sortcol")
df.sort($"sortcol")
df.sort($"sortcol".asc)
●orderBy(sortExprs:Column∗):按给定的表达式返回一个新的DataFrame。这是sort排序函数的别名。输入参数为多个Column类。
●orderBy(sortCol:String,sortCols:String∗):按给定的表达式返回一个新的DataFrame。这是sort排序函数的别名。输入参数为多个String字符串。
3)示例。
28.sample(取样)
1)定义。
def sample(withReplacement:Boolean,fraction:Double):DataFrame
def sample(withReplacement:Boolean,fraction:Double,seed:Long):DataFrame
2)功能描述。
Sample对RDD中的数据集进行采样,生成一个新的RDD。withReplacement=true,表示重复抽样;withReplacement=false,表示不重复抽样;fraction参数是生成行的比例。
●sample(withReplacement:Boolean,fraction:Double):使用随机因子对DataFrame的Rows进行取样,返回一个新的DataFrame。
●sample(withReplacement:Boolean,fraction:Double,seed:Long):按指定因子(seed)对DataFrame的Rows进行取样,返回一个新的DataFrame。
3)示例。
当withReplacement为true时,采用PossionSampler抽样器(Possion,泊松分布);当withReplacemet为false时,采用BernoulliSampler抽样器(Bernoulli,伯努利采样)
29.select
30.unionAll
1)定义。
def unionAll(other:DataFrame):DataFrame
2)功能描述。
合并两个DataFrame的全部数据记录。
3)示例。
31.withColumn和withColumnRenamed
1)定义。
def withColumn(colName:String,col:Column):DataFrame
def withColumnRenamed(existingName:String,newName:String):DataFrame
2)功能描述。
对DataFrame列进行操作,withColumn增加DataFrame的列信息,withColumnRenamed则是对DataFrame的列进行重命名。
3)示例。
32.insertInto、insertIntoJDBC、createJDBCTable
1)定义。
2)功能描述。
●insertInto(tableName:String):从RDD插入行到指定的表。如果表已经存在,则抛出异常。
●insertInto(tableName:String,overwrite:Boolean):从RDD插入行到指定的表,可选择是否覆盖现有数据。
●insertIntoJDBC(url:String,table:String,overwrite:Boolean):根据url(参数url用来指定数据库信息)保存DataFrame到JDBC数据库的表中。如果表已经存在且模式兼容,overwrite覆盖设置为true,则在执行插入之前将先删除掉(truncate)表中的数据。表必须已经存在于数据库中;数据库表的模式和RDD的模式兼容,从RDD插入行,可以通过简单的声明,使用′INSERT INTO table VALUES(?,?,...,?)插入值,操作将不会失败。
●createJDBCTable(url:String,table:String,allowExisting:Boolean):根据url(参数url用来指定数据库信息)保存DataFrame到JDBC数据库的表中,将运行新建表(CRE-ATE TABLE)和插入(INSERT INTO)的语句。如果设置allowExisting为true,将删掉数据库中给定名称的表;如果设置allowExisting为false,将抛出表已经存在的异常。
createJDBCTable和insertIntoJDBC从Spark 1.4.0版本开始使用,将在Spark2.0中废弃,可以使用Write.jdbc()方法。
3)示例。
4)解析。
该示例首先创建两个DataFrame对象:testin、idf。然后,testin对象调用createJD-BCTable方法,在MySQL中创建一张表dbtable,把testin中的数据写入其中。接下来idf对象调用insertIntoJDBC方法,将idf的数据插入到刚才创建的表dbtable中。
33.flatMap
1)定义。
def flatMap[R](f:(Row)⇒TraversableOnce[R])(implicit arg0:ClassTag[R]):RDD[R]
2)功能描述。
创建一个新的RDD对DataFrame中的所有最后记录进行处理,并且将处理结果的所有数据仅返回一个数组对象。
flatMap[R]其中的,R的类型是ClassTag,ClassTag[T]通过runtimeClass清除给定的类型T,这在Array元素类型未知,编译实例化元素特别有用。ClassTag是scala.reflect.api.Ty- peTags#TypeTag的特殊情况,在运行时根据给定的类型封装,而TypeTag包含所有静态类型信息。ClassTag是由top-level类构建,对于运行时创建Array,这些信息足够了,因此,不必知道所有参数类型。
3)示例。
示例中将数据记录转化为由每一列组成的List。
34.foreach
1)定义。
def foreach(f:(Row)⇒Unit):Unit
def foreachPartition(f:(Iterator[Row])⇒Unit):Unit
2)功能描述。
foreach方法对DataFrame中的数据记录进行循环遍历处理。foreachPartition方法则是对对应分区中的数据记录进行处理,即Iterator[Row],使用方法类似。
3)示例。
35.map
1)定义。
def map[R](f:(Row)⇒R)(implicit arg0:ClassTag[R]):RDD[R]
2)功能描述。
map方法将DataFrame的数据记录按指定的函数映射成一个新的RDD实例。
3)示例。
36.repartition
1)定义。
2)功能描述
返回一个DataFrame,该DataFrame按指定numPartitions对原DataFrame进行重分区。
●repartition(numPartitions:Int):返回一个新的DataFrame,生成numPartitions个分区。
●repartition(partitionExprs:Column∗):返回一个新的DataFrame,根据给定的分区表达式保存现有的分区数,由此产生的DataFrame是哈希分区,这和SQL(Hive QL)中的DISTRIBUTE BY操作是相同的。
●repartition(numPartitions:Int,partitionExprs:Column∗):返回一个新的DataFrame,根据给定的分区表达式生成numPartitions个分区,由此产生的DataFrame是哈希分区,这和SQL(Hive QL)中的DISTRIBUTE BY操作是相同的。
3)示例。
scala>df.repartition(1).rdd.partitions.size
res12:Int=1
37.toJSON
1)定义。
def toJSON:RDD[String]
2)功能描述。
把Dataframe的数据记录用包含JSON字符串的RDD形式返回。
3)示例。
38.queryExecution
1)定义。
val queryExecution:QueryExecution
2)功能描述。
返回DataFrame的查询执行语句,包含逻辑计划(Logical Plan)和物理计划(Physi-cal Plan)。返回DataFrame的查询执行语句,包含逻辑计划和物理计划。逻辑计划描述了DataFrame生成数据所需的逻辑计算,Spark查询优化器将优化逻辑计划,生成一个并行分布式有效执行的物理计划,DataFrame可以通过queryExecution方法来查询逻辑计划和物理计划。
3)示例。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。