用户自定义的聚合函数(User Defined Aggregation Function,UDAF),本身作用于数据集合,能够在聚合操作的基础上进行自定义操作。实际上,UDF会被Spark SQL中的Catalyst封装成为Expression,最终会通过eval方法来计算输入的数据Row(注:此处的Row和Dat-aFrame中的Row没有任何关系)。Spark对UDF的支持较早。Spark 1.1就推出来了,而UDAF是Spark 1.5左右才出的。晚了好几个版本,说明UDAF有其复杂性,因为它有大量的Aggregation之类的操作,是对批量的数据集合进行操作。而UDF是对一条数据进行操作,即你会有具体的一条输入的数据,具体如何进行操作,就是一个普通的Scala函数。实际应用中也是如此,使用UDAF时往往是对数据进行分组的,然后操作。理论上讲,通过UDF和UDAF可以实现任何功能。
一个UDAF维护一个聚合缓冲区来存储每组输入数据的中间结果。它为每个输入行更新此缓冲区,一旦处理完所有输入行,基于该聚合缓冲区的值返回结果。一个UDAF继承了基类UserDefinedAggregateFunction并实现以下8个方法:
●inputSchema:inputSchema返回StructType。这个StructType的各个字段代表了这个UDAF的输入参数。
●BufferSchema:BufferSchema返回StructType。这个StructType的各个字段代表了这个UDAF的中间结果的一个值。
●dataType:dataType表示此UDAF的返回值的数据类型。
●deterministic:deterministic返回一个布尔值,用于表明在给定输入值的前提下,此UDAF是否总是生成一组相同的结果。(www.xing528.com)
●initialize:initialize用于初始化聚集缓冲区(例如MutableAggregationBuffer)的值。
●update:update更新用于输入行的聚集缓冲区(例如MutableAggregationBuffer)。
●merge:merge用于合并两个聚集缓冲区,并将结果存储到MutableAggregationBuffer。
●evaluate:evaluate用于生成这个UDAF的最终值。这个值基于每一行的聚合缓冲区的值。
使用UDAF有两种方式:第一种,一个UDAF的实例可以立即当作函数使用;第二种,用户可以向Spark SQL的功能注册表注册UDAF,然后通过分配的名称调用此UDAF。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。