首页 理论教育 DataFrame API实战应用举例

DataFrame API实战应用举例

时间:2023-07-02 理论教育 版权反馈
【摘要】:DataFrame提供了一套丰富的API,让Spark变得更加平易近人,使得大数据分析的开发越来越容易。接下来对常用的DataFrame API进行示例解析,相应的操作数据为电影票房收入的数据文件film.json和newFilm.json。返回DataFrame的数据记录的条数。类似head方法,返回DataFrame中指定的前n行的值。将DataFrame缓存到内存中。以数组形式返回DataFrame的所有列名。将DataFrame注册为指定名字的临时表。

DataFrame API实战应用举例

DataFrame提供了一套丰富的API,让Spark变得更加平易近人,使得大数据分析的开发越来越容易。DataFrame API将关系型的处理与过程型处理结合起来,可以对外部数据源(Hive、JSON等)和Spark内建的分布式集合(RDD)进行关系型操作。

DataFrame能处理的外部数据源,除了内置的Hive、JSON、Parquet、JDBC以外,还包括CSV、Avro、HBase等多种数据源,Spark SQL多元一体的结构化数据处理能力正在逐渐释放。DataFrame数据采用压缩的列式存储,对DataFrame的操作采用Catalyst——一种关系操作优化器(也称为查询优化器),因此效率更高。

本节读取电影票房收入的数据文件,生成名为film的DataFrame,对读DataFrame应用各种Spark算子进行计算。

首先准备数据。模拟生成电影票房收入的数据文件film.json、newFilm.json,从本地上传到Hdfs文件系统中,电影票房收入文件的格式包括6列:票房收入、制片地区、影片ID、语言代码、影片名、上映年份。电影票房收入的数据文件内容如下:

模拟生成语言代码文件language.json,同样从本地上传到Hdfs文件系统中,语言代码文件的格式包括2列:语言名称、语言代码。语言代码文件的数据内容如下:

接下来在Spark-Shell中使用sqlContext.jsonFile方法分别加载电影票房收入文件,语言代码文件。

接下来对常用的DataFrame API进行示例解析,相应的操作数据为电影票房收入的数据文件film.json和newFilm.json。

1.collectcollectAsList

1)定义。

def collect():Array[Row]

def collectAsList():List[Row]

2)功能描述。

●collect返回一个数组,包含DataFrame中包含的全部数据记录。

●collectAsList返回一个Java List,包含DataFrame中包含的全部数据记录。

3)示例。

4)解析。

上述示例中首先加载film,然后以表格的形式呈现其内容。

两个collect型的方法都可以获取df的全部数据记录,只是返回的类型不同。调用col- lect方法,返回的是数组,数组元素的类型为org.apache.spark.sql.Row;调用collectASList返回类型java.util.List。

2.count

1)定义。

def count():long

2)功能描述。

返回DataFrame的数据记录的条数。

3)示例。

scala>df.count

16/04/2714:33:45 INFOmapred.FileInputFormat:Total input paths to process:1

res20:Long=7

3.describe

1)定义。

def describe(cols:String∗):DataFrame

2)功能描述。

概要与描述性统计(Summary and Descriptive Statistics),包含计数、平均值、标准差、最大值和最小值运算。

3)示例。

4.First

1)定义。

def first():Row

2)功能描述。

返回DataFrame的第一行,等同于head()方法。

3)示例。

scala>df.first

res22:org.apache.spark.sql.Row=[33.9,CN,001,1,Mermaid,2016]

5.head

1)定义。

def head():Row

2)功能描述。

不带参数的head方法,返回DataFrame的第一条数据记录;指定参数n时,则返回前n条数据记录。

3)示例。

6.show

1)定义。

def show():Unit

def show(numRows:Int):Unit

2)功能描述。

●不带参数时,用表格的形式显示DataFrame的前20行记录。

●指定参数numRows时,用表格的形式显示DataFrame指定的行数记录。

3)示例。

7.take

1)定义。

def take(n:Int):Array[Row]

2)功能描述。

类似head方法,返回DataFrame中指定的前n行的值。

3)示例。

8.cache

1)定义。

def cache():DataFrame.this.type

2)功能描述。

将DataFrame缓存到内存中。

3)示例。

9.columns

1)定义。

def columns:Array[String]

2)功能描述。

以数组形式返回DataFrame的所有列名。

3)示例。

scala>df.columns

res29:Array[String]=Array(boxoffice,country,id,languageid,name,year)

10.dtypes

1)定义。

def dtypes:Array[(String,Stirng)]

2)功能描述。

以数组形式返回所有列名及其对应数据类型。

3)示例。

11.explain

1)定义。

def explain(extended:Boolean):Unit

2)功能描述。

这个方法用于调试目的。

●不带参数时,仅将DataFrame的物理计划打印到Web控制台上。

●当指定参数extended为true时,打印所有计划到Web控制台上,包括解析逻辑计划、分析逻辑计划、优化的逻辑计划和物理计划。

3)示例。

●不带参数时,仅将物理计划打印到Web控制台上。

●指定参数extended为true时,打印所有计划到Web控制台上。

12.printSchema

1)定义。

def printSchema():Unit

2)功能描述。

以树形结构将DataFrame的Schema信息打印到Web控制台上。

3)示例。

13.registerTempTable

1)定义。

def registerTempTable(tableName:String):Unit

2)功能描述。

将DataFrame注册为指定名字的临时表。

3)示例。

4)解析。

将DataFrame注册成临时表之后,可以使用SQLContext的sql方法,对其执行SQL语句。

14.schema

1)定义。

def schema:StructType

2)功能描述。

返回DataFrame的Schema信息,对应类型为StrutType。

3)示例。

15.toDF

1)定义。

def toDF():DataFrame

def toDF(colNames:String∗):DataFrame

2)功能描述。

●不带参数的toDF返回它本身。

●带字符串数组的参数时,返回新的DataFrame,该DataFrame重命名了各列名。

3)示例。

16.persist

1)定义。

2)功能描述。

以给定的存储等级将DataFrame持久化到内存或者磁盘中。unpersist则是将DataFrame标记为非持久化的。

●persist(newLevel:StorageLevel):设置RDD的存储级别,在其首次进行计算以后持久化值。如RDD没设置存储级别,此方法用于设置新的存储级别。本地检查点将会提示异常。

●persist():以默认的存储级别(MEMORY_ONLY)持久化RDD。

●unpersist():设置RDD为非持久化,其中unpersist的入参blocking默认设置为true,即阻塞直到所有块被删除。

●unpersist(blocking:Boolean):设置RDD为非持久化,清除RDD在内存和磁盘中的所有块。

3)示例。

17.agg

1)定义。

2)功能描述。

agg是Spark1.5.x开始提供的一类内置函数。agg这一系列的方法,为DataFrame提供数据列不需要经过group就可以执行统计操作。

3)示例。

下面的示例是统计year的最大值和boxoffice的平均值。

下面是使用Map作为参数的示例,分别统计year的最小值和boxoffice的平均值。

上面的代码与下面使用二元数组作为参数的示例相同。

18.apply

1)定义。

def apply(colName:String):Column

2)功能描述。

根据指定列名返回DataFrame的列,其类型为Column。

3)示例。

19.as

1)定义。

def as(alias:Symbol):DataFrame

2)功能描述。

调用as方法后,使用别名构建DataFrame。

3)示例。

首先,注册临时表,然后修改调试日志的级别,方便查看调试信息。

为了分析这个方法的作用,下面分别查看带as方法和不带as方法的两种情况。(www.xing528.com)

●不带as方法时的调试信息:

●带as方法时的调试信息:

4)解析。

通过上面两种情况的比较可以看出,仅在解析逻辑计划、分析逻辑计划中,使用了别名Subquery alise。

20.distinct

1)定义。

def distinct():DataFrame

2)功能描述。

返回对DataFrame的数据记录去重后的DataFrame。

3)示例。

4)解析。

在实例中,选择了有重复数据记录的“year”列,最后调用distinct方法进行去重。

21.except

1)定义。

def except(other:DataFrame):DataFrame

2)功能描述。

返回DataFrame,包含当前DataFrame的数据记录,同时这些Rows不在另一个Dat-aFrame中,相当于两个DataFrame做减法。

3)示例。

4)解析。

因为001Mermaid这部电影既在df中,又在newDf中,所以经过排除运算之后,001这条记录不再被显示。

22.filter

1)定义。

def filter(conditionExpr:String):DataFrame

def filter(condition:Column):DataFrame

2)功能描述。

按参数指定的SQL表达式的条件过滤DataFrame。

●filter(conditionExpr:String)根据给的的SQL表达式进行过滤;filter(condition:Column)根据给的条件进行过滤。例如过滤出年龄大于15岁的用户记录:

peopleDf.filter("age>15")

peopleDs.where($"age">15)

这两种写法是等价的。

3)示例。

23.groupBy

1)定义。

def groupBy(col1:String,cols:String∗):GroupedData

def groupBy(cols:Column∗):GroupedData

2)功能描述。

使用一个或多个指定的列对DataFrame进行分组,以便对它们执行聚合操作。

3)示例。

示例中先根据country列对df进行分组,分组后求年份的最大值和票房的平均值。

24.intersect

1)定义。

def intersect(other:DataFrame):DataFrame

2)功能描述。

取两个DataFrame中同时存在的数据记录,返回DataFrame。

3)示例。

在该示例中,因为001 Mermaid这部电影既在df中,又在newDf中,所以intersect的运算结果为Mermaid。

25.join

1)定义。

2)功能描述。

对两个DataFrame执行join操作。join根据传入参数的不同有多种实现方法。不带参数时取笛卡儿积,仅带join Exprs时默认为Inner Join,第三个join参数joinType可以指定具体的join操作,例如:innerleft outer、rightouter、leftsemio

3)示例。

在本章的2.2.3节的第14示例中,已经介绍了如何对两个DataFrame实例进行join操作。下面再来做一个简单的join示例,将影片信息和语言信息做join关联操作,查询显示汉语、英语其他语种的电影影片信息。

26.limit

1)定义。

def limit(n:Int):DataFrame

2)功能描述。

返回DataFrame的前n条数据记录。

3)示例

27.orderBysort

1)定义。

2)功能描述。

对DataFrame按指定的一列或多列进行排序,分别支持字符串或Column的参数列表。

●sort(sortExprs:Column∗):根据给定的表达式返回一个新的DataFrame。例如:df.sort($"col1",$"col2".desc)

●sort(sortCol:String,sortCols:String∗):根据指定的列返回一个新的DataFrame,所有列升序排列。以下三种写法等价:

df.sort("sortcol")

df.sort($"sortcol")

df.sort($"sortcol".asc)

●orderBy(sortExprs:Column∗):按给定的表达式返回一个新的DataFrame。这是sort排序函数的别名。输入参数为多个Column类。

●orderBy(sortCol:String,sortCols:String∗):按给定的表达式返回一个新的DataFrame。这是sort排序函数的别名。输入参数为多个String字符串。

3)示例。

28.sample(取样

1)定义。

def sample(withReplacement:Boolean,fraction:Double):DataFrame

def sample(withReplacement:Boolean,fraction:Double,seed:Long):DataFrame

2)功能描述。

Sample对RDD中的数据集进行采样,生成一个新的RDD。withReplacement=true,表示重复抽样;withReplacement=false,表示不重复抽样;fraction参数是生成行的比例。

●sample(withReplacement:Boolean,fraction:Double):使用随机因子对DataFrame的Rows进行取样,返回一个新的DataFrame。

●sample(withReplacement:Boolean,fraction:Double,seed:Long):按指定因子(seed)对DataFrame的Rows进行取样,返回一个新的DataFrame。

3)示例。

当withReplacement为true时,采用PossionSampler抽样器(Possion,泊松分布);当withReplacemet为false时,采用BernoulliSampler抽样器(Bernoulli,伯努利采样)

29.select

30.unionAll

1)定义。

def unionAll(other:DataFrame):DataFrame

2)功能描述。

合并两个DataFrame的全部数据记录。

3)示例。

31.withColumnwithColumnRenamed

1)定义。

def withColumn(colName:String,col:Column):DataFrame

def withColumnRenamed(existingName:String,newName:String):DataFrame

2)功能描述。

对DataFrame列进行操作,withColumn增加DataFrame的列信息,withColumnRenamed则是对DataFrame的列进行重命名。

3)示例。

32.insertInto、insertIntoJDBC、createJDBCTable

1)定义。

2)功能描述。

●insertInto(tableName:String):从RDD插入行到指定的表。如果表已经存在,则抛出异常。

●insertInto(tableName:String,overwrite:Boolean):从RDD插入行到指定的表,可选择是否覆盖现有数据。

●insertIntoJDBC(url:String,table:String,overwrite:Boolean):根据url(参数url用来指定数据库信息)保存DataFrame到JDBC数据库的表中。如果表已经存在且模式兼容,overwrite覆盖设置为true,则在执行插入之前将先删除掉(truncate)表中的数据。表必须已经存在于数据库中;数据库表的模式和RDD的模式兼容,从RDD插入行,可以通过简单的声明,使用INSERT INTO table VALUES(?,?,...,?)插入值,操作将不会失败。

●createJDBCTable(url:String,table:String,allowExisting:Boolean):根据url(参数url用来指定数据库信息)保存DataFrame到JDBC数据库的表中,将运行新建表(CRE-ATE TABLE)和插入(INSERT INTO)的语句。如果设置allowExisting为true,将删掉数据库中给定名称的表;如果设置allowExisting为false,将抛出表已经存在的异常。

createJDBCTable和insertIntoJDBC从Spark 1.4.0版本开始使用将在Spark2.0中废弃可以使用Write.jdbc()方法

3)示例。

4)解析。

该示例首先创建两个DataFrame对象:testin、idf。然后,testin对象调用createJD-BCTable方法,在MySQL中创建一张表dbtable,把testin中的数据写入其中。接下来idf对象调用insertIntoJDBC方法,将idf的数据插入到刚才创建的表dbtable中。

33.flatMap

1)定义。

def flatMap[R](f:(Row)⇒TraversableOnce[R])(implicit arg0:ClassTag[R]):RDD[R]

2)功能描述。

创建一个新的RDD对DataFrame中的所有最后记录进行处理,并且将处理结果的所有数据仅返回一个数组对象。

flatMap[R]其中的,R的类型是ClassTag,ClassTag[T]通过runtimeClass清除给定的类型T,这在Array元素类型未知,编译实例化元素特别有用。ClassTag是scala.reflect.api.Ty- peTags#TypeTag的特殊情况,在运行时根据给定的类型封装,而TypeTag包含所有静态类型信息。ClassTag是由top-level类构建,对于运行时创建Array,这些信息足够了,因此,不必知道所有参数类型。

3)示例。

示例中将数据记录转化为由每一列组成的List。

34.foreach

1)定义。

def foreach(f:(Row)⇒Unit):Unit

def foreachPartition(f:(Iterator[Row])⇒Unit):Unit

2)功能描述。

foreach方法对DataFrame中的数据记录进行循环遍历处理。foreachPartition方法则是对对应分区中的数据记录进行处理,即Iterator[Row],使用方法类似。

3)示例。

35.map

1)定义。

def map[R](f:(Row)⇒R)(implicit arg0:ClassTag[R]):RDD[R]

2)功能描述。

map方法将DataFrame的数据记录按指定的函数映射成一个新的RDD实例。

3)示例。

36.repartition

1)定义。

2)功能描述

返回一个DataFrame,该DataFrame按指定numPartitions对原DataFrame进行重分区。

●repartition(numPartitions:Int):返回一个新的DataFrame,生成numPartitions个分区。

●repartition(partitionExprs:Column∗):返回一个新的DataFrame,根据给定的分区表达式保存现有的分区数,由此产生的DataFrame是哈希分区,这和SQL(Hive QL)中的DISTRIBUTE BY操作是相同的。

●repartition(numPartitions:Int,partitionExprs:Column∗):返回一个新的DataFrame,根据给定的分区表达式生成numPartitions个分区,由此产生的DataFrame是哈希分区,这和SQL(Hive QL)中的DISTRIBUTE BY操作是相同的。

3)示例。

scala>df.repartition(1).rdd.partitions.size

res12:Int=1

37.toJSON

1)定义。

def toJSON:RDD[String]

2)功能描述。

把Dataframe的数据记录用包含JSON字符串的RDD形式返回。

3)示例。

38.queryExecution

1)定义。

val queryExecution:QueryExecution

2)功能描述。

返回DataFrame的查询执行语句,包含逻辑计划(Logical Plan)和物理计划(Physi-cal Plan)。返回DataFrame的查询执行语句,包含逻辑计划和物理计划。逻辑计划描述了DataFrame生成数据所需的逻辑计算,Spark查询优化器将优化逻辑计划,生成一个并行分布式有效执行的物理计划,DataFrame可以通过queryExecution方法来查询逻辑计划和物理计划。

3)示例。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈