函数名称:GeometricMean。
函数功能:用于求几何平均值。
函数示例:本例子中同时展示了使用UDAF的两种方法。
1)定义GeometricMean类继承至UserDefinedAggregateFunction:
●重载实现方法inputSchema:返回StructType字段(数值,浮点类型),作为Geometric-Mean函数的输入参数;
●重载实现方法BufferSchema:返回StructType字段((计数,长整型),(乘积,浮点型)),作为GeometricMean函数的中间结果的值;
●重载实现方法dataType:dataType表示GeometricMean函数返回值的类型是浮点类型;
●重载实现方法deterministic:设置true,在给定输入值的前提下,GeometricMean生成一组相同的结果;
●重载实现方法initialize:初始化buffer的第0个元素即计数值为0,初始化buffer的第1个元素即乘积值为0.0,用于初始化聚集缓冲区(MutableAggregationBuffer)的值;
●重载实现方法update:buffer更新第0个元素值即计数值,当前的数值计数为1次,计数值就累加1;buffer更新第1个元素值即乘积值,将buffer第1个元素的原乘积值乘以读入的每行元素第0个元素即数值,将原乘积乘以新的数值作为更新的乘积值;(www.xing528.com)
●重载实现方法merge:merge用于合并两个聚集缓冲区,将第一个缓冲区的第0个元素即计数值加上第二个缓冲区的第0个元素即计数值,作为合并以后的缓冲区的第0个元素即计数值;将第一个缓冲区的第1个元素即乘积值乘以第二个缓冲区的第1个元素即乘积值,作为合并以后的缓冲区的第1个元素乘积值;
●重载实现方法evaluate:GeometricMean函数的最终计算结果为:将buffer的第1个元素(即总乘积值)作为底数,buffer的第0个元素(即总计数值的倒数)做次幂,两者作幂计算即计算出结果:;
2)构建SparkContext以及SQLContext,导入spark的org.apache.spark.sql.functions._,将使用spark sql强大的内置函数功能。
3)使用sqlContext.range创建一个DataFrame,其包括1列,列名为id,列中元素类型为LongType,列的数值范围从11到50(不包括50),数值步长值为1。创建数值集变量df。
4)创建GeometricMean实例Val gm=new Geometric Mean。
5)用UDAF计算几何平均值方法一:GeometricMean UDAF的实例作函数使用。df的DataFrame根据id列分组,使用agg函数调用GeometricMean UDAF函数gm,在Geometric-Mean函数gm中传入id列数值,计算11至49的几何平均值,然后通过show方法展示结果。
6)用UDAF计算几何平均值方法二:GeometricMean UDAF在Spark SQL注册使用。在sqlContext.udf中注册gm的自定义UDAF函数,然后df的DataFrame根据id列分组,使用agg函数调用表达式,使用已在Spark SQL注册的GeometricMean UDAF函数gm,在Geomet-ricMean函数gm中传入id列数值,计算11至49的几何平均值,然后show方法展示结果。
在本地运行,结果如下所示。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。