本节Spark SQL操作MongoDB案例将对学生信息表的相关记录进行查询统计操作。具体步骤如下:
1)MongoDB服务器启动MongoDB服务。
2)Spark中引入MongoDB的相关JAR包。
3)设置MongoDB数据库连接的URI信息:地址、端口、数据库及文档信息。
4)设置MongoDB的查询条件。
5)Spark代码打成JAR包,提交集群运行。Spark从MongoDB中查询统计学生信息的数据。
●查询性别为男的所有学生。
●查询性别为男、数学成绩高于80分的文档。
●数学成绩低于90的分数加上5分成绩。
●删除历史History分数小于90的键。
●查询结果保存到MongoDB数据库中。
Spark中读取MongoDB数据库,需要依赖的JAR包包括:MongoDB-driver-3.0.2.jar,MongoDB-driver-core-3.0.2.jar,bson-3.0.2.jar,mongo-hadoop-core-1.4.0.jar,mon-go-java-driver-3.0.2.jar,spark-MongoDB_2.10-0.10.1.jar,可以在Maven的配置文件pom.xml中增加以下依赖关系:
其中mongo-hadoop-core-1.4.0.jar是连接器,可以用它来实现从MongoDB上读写数据,其配置参数使用配置对象传递,其中最重要的两个参数是mongo.input.uri和mongo.output.uri,这两个参数提供了MongoDB主机、端口、权限、数据库和数据集合名字。
在用Spark操作MongoDB的过程中,首先使用命令./mongod--dbpath../data/--log-path../log---logappend启动MongoDB的服务。
然后用Spark操作MngoDB需要引入如下的类(org.bson.BasicBSONObject、org.bson.BSONObject、com.mongodb.hadoop.MongoInputFormat、com.mongodb.hadoop.Mongo Output-Format、org.apache.hadoop.conf.Configuration)
引入MongoDB的JAR包以后,在main主程序中封装各方法操作MongoDB,步骤如下:
●初始化Spark Context。
●在主程序业务代码中分别调用各方法:queryDocuments(sc)、querySubcollection(sc)、updateMath(sc)、removeHistory(sc)、saveToMongo(sc)
●关闭Spark Context。
main主函数代码如下:
(1)示例一:查询性别为男的所有学生
1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的input输入类型为Mon-goInputFormat;设置MongoDB的查询条件mongo.input.query:性别是男。
2)调用SparkContext的newAPIHadoopRDD方法,newAPIHadoopRDD的第一个参数Con-figuration用于设置数据集的配置,Configuration将被放进Spark广播中。这里传入MongoDB的配置类;第二个参数InputFormat为输入类型是MongoInputFormat格式;第三个参数是返回结果的Key值,类型为Object;第四个参数是返回结果的Value值,类型为BSONObject;(www.xing528.com)
3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是MongoDB中MyDB数据库MyCollection集合中性别为男的文档document记录。
查询结果如下:
(2)示例二:查询性别是男并且数学分数大于80分的文档
1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的input输入类型为Mon-goInputFormat;设置MongoDB的查询条件mongo.input.query:信息Info性别为男,分数Score为数学成绩大于80分。
2)调用SparkContext的newAPIHadoopRDD方法。
3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是MongoDB中MyDB数据库MyCollection集合中性别是男并且数学大于80分的记录。
(3)示例三:数学小于90分的分数加5分
1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的输出output URI连接属性:地址、端口、test数据库及foo集合信息;设置MongoDB的input输入类型为MongoIn-putFormat;设置MongoDB的更新操作mongo.input.update:数学小于90分就加5分。
2)调用SparkContext的newAPIHadoopRDD方法。将数学成绩低于90分的加5分以后的输出结果保存到MongoDB test数据库的foo集合。
3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是MongoDB中MyDB数据库MyCollection集合中数学成绩低于90分就加5分的记录。
操作结果如下图:
(4)示例四:删除历史分数小于90分的键
1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的input输入类型为Mon-goInputFormat;设置MongoDB的更新操作mongo.input.update:删除历史分数小于90分的键值。
2)调用SparkContext的newAPIHadoopRDD方法。
3)对newAPIHadoopRDD查询MongoDB的结果遍历,打印输出。第一个元素是Object ID,第二个元素是删除历史分数小于90分的集合记录,即X同学的历史分数是87分小于90分,因此X同学记录中的历史分数键值对被删除。
(5)示例五:保存数据到mongodbMongoDB中
1)创建Hadoop的Configuration配置类,设置MongoDB的输入input URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;设置MongoDB的输出output URI连接属性:地址、端口、MyDB数据库及MyCollection集合信息;
2)调用Spark的parallelize方法生成data RDD。
3)遍历data,将data的数据写入到BasicBSONObject,BasicBSONObject对象obj放入name、age键值对,map方法遍历以后返回元组Key-Value(null,obj)键值对,元组的第一个元素为null,因为保存至Hadoop时第一个元素是NullWritable;第二个元素为BasicB-SONObject对象obj。
4)调用SparkContext的saveAsNewAPIHadoopFile方法保存记录,saveAsNewAPIHa-doopFile方法的第一个参数是MongoDB结果保存的路径,即保存在MyDB数据库的MyCol-lection集合中,第二个参数是输入Key的类型Any,第三个参数是输入Value的类型Any,第四个参数是输出的类型MongoOutputFormat[Any,Any],第五个参数是Hadoop的配置类mongoConfig。
保存的结果如下所示:
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。