函数名称:belowThreshold。
函数功能:用于检测分组的数据中是否存在位于给定阈值下的数,如果是,则返回true;否,则返回false。
函数示例:
1)定义belowThreshold类继承至UserDefinedAggregateFunction:
●重载实现方法inputSchema:返回StructType字段(power,整数类型),作为be-lowThreshold函数的输入参数;
●重载实现方法BufferSchema:返回StructType字段(bool,布尔值类型),作为be-lowThreshold函数的中间结果的值;
●重载实现方法dataType:dataType表示belowThreshold函数返回值的类型是布尔值类型;
●重载实现方法deterministic:设置true,在给定输入值的前提下,belowThreshold生成一组相同的结果;
●重载实现方法initialize:初始化buffer的第0个元素布尔值为false,用于初始化聚集缓冲区(MutableAggregationBuffer)的值;
●重载实现方法update:如果读入input每行的第0个元素不为空,则将buffer第0个元素原布尔值与读入input每行的第0个元素power值是否小于“-40”进行逻辑或运算,两者之一为true,更新buffer第0个元素值为True。即读入数据中只要有一个小于“-40”的数据,就返回true;(www.xing528.com)
●重载实现方法merge:merge用于合并两个聚集缓冲区,将第一个缓冲区的第0个元素的布尔值与第二个缓冲区的第0个元素的布尔值做逻辑或运算,作为合并以后的缓冲区的第0个元素的布尔值;
●重载实现方法evaluate:belowThreshold函数的最终计算结果为:buffer的第0个元素的布尔值。
2)构建SparkContext以及SQLContext,导入Spark的sqlContext隐式转换类import sql-Context.implicits._,用于将一个RDD隐式转换为一个DataFrame。
3)通过sc的parallelize方法读入Seq集合数据,调用toDF()方法转换成DataFrame,df包括两列:group,power。
4)创建belowThreshold实例belowThreshold。
5)在sqlContext.udf中注册belowThreshold的自定义函数,判断输入的数值是否小于“-40”。
6)df的DataFrame根据group列分组,使用agg函数通过belowThreshold UDAF函数判断输入power列的数值是否小于“-40”,查询打印结果。
在本地运行,结果如下所示:
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。