Spark GraphX统一了Table视图和Graph视图,所以对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作。这样对一个图的计算,最终在逻辑上等价于一系列的RDD的transformation(转换)过程。Spark GraphX的Graph类和GraphOps类提供了丰富的图计算操作的实现方法,通过这些计算操作可以形成新的图。Spark GraphX的图的操作主要分为下面几大类。
1.Spark GraphX图的构建操作
Spark GraphX中提供了以下几种方法来构建弹性分布式属性图。
●Graph类的fromEdgeTuples()方法。
●Graph类的fromEdges()方法。
●Graph类的apply()方法。
●GraphLoader类的edgeListFile()方法。
下面给出各个方法的具体使用方法。
(1)Graph类提供的fromEdgeTuples()方法如下所示。
该方法从于以一个顶点ID(Vertex id)对的格式进行编码的边(Edge)的集合中,构建出一个Graph。
其中各个参数的含义如下:
●参数rawEdges指的是一系列由元组(源顶点ID,目的顶点ID)组成的边。
●参数defaultValue是顶点属性的默认值,在创建图的时候会自动创建边中所存在的顶点并设置它的属性为默认值。
●参数uniqueEdges表示的是分区策略,默认是None,如果在调用fromEdgeTuples()方法时传入了uniqueEdges的值则会对生成的图进行分区操作,并且图中重复的边将会被合并,重复的边的属性值会相加得到合并后的属性值。
●参数edgeStorageLevel和vertexStorageLevel分别指的是边和顶点的存储策略,默认都是Storagelevel.MEMORY_ONLY。
(2)Graph类提供的fromEdges()方法如下所示。
该方法用于从一个以边(Edge)为元素的RDD中,构建出一个Graph。
其中各个参数的含义如下:
●参数edges指的是含有一系列边的RDD,这里的边不仅包括边的源顶点和目的顶点的信息,还包括边的属性。
●参数defaultValue是顶点属性的默认值。
●参数edgeStorageLevel和vertexStorageLevel分别指的是边和顶点的存储策略,默认都是Storagelevel.MEMORY_ONLY。
(3)Graph类提供的apply()方法如下所示。
该方法用于从一个以顶点ID(Vertex Id)为元素的RDD和边(Edge)为元素的RDD中,构建出一个Graph。
其中各个参数的含义如下:
●参数vertices指的是一组顶点RDD[VertexId,VD],其中,VertexId是顶点的ID,VD是顶点的属性。
●参数edges指的是一组边RDD[Edge[ED]],ED是边的属性。
●参数defaultVertexAttr是顶点属性的默认值,当顶点在边RDD中存在但在顶点RDD中不存在时,为这种类型顶点的默认值。
●参数edgeStorageLevel和vertexStorageLevel分别指的是边和顶点的存储策略,默认都是Storagelevel.MEMORY_ONLY。
(4)GraphLoader类提供了从外部文件系统读取文件加载成图的方法edgeListFile(),其中读取的文件内部是一系列邻接列表对(源顶点ID,目的顶点ID)。edgeListFile()方法如下所示。
该方法用于通过加载外部文件系统中的、包含边的列表的文件,构建出一个Graph。
其中各个参数的含义如下:
●参数sc指的就是SparkContext上下文对象。
●参数path指的是文件的存储路径。
●参数canonicalOrientation指的是邻接列表对(源顶点ID,目的顶点ID)所对应的边是否有方向(srcId<dstId),默认值false;如果是有方向的即canonicalOrientation的值设置为true,那么只有在源顶点的ID大于目标顶点的ID的时候,邻接列表对才是有效的。
●参数numEdgePartitions指的是边RDD的分区大小,默认设置为-1,这时指的是采用系统默认的并行度(系统的并行度的值是由参数spark.default.parallelism来指定的)。
●参数edgeStorageLevel和vertexStorageLevel分别指的是边和顶点的存储策略,默认都是Storagelevel.MEMORY_ONLY。
2.Spark GraphX的属性操作
Spark GraphX的属性操作包括获取属性图的顶点集、边集、三元组Triplet集等操作,这些获取属性的方法主要在Graph和GraphOps两个类中提供。
Graph提供的获取属性的方法主要有以下几种。
(1)Graph的vertices方法:Graph.vertices返回的是一个包含一组顶点(顶点ID,顶点属性)的VertexRDD。代码如下:
(2)Graph的edges方法:Graph.edges返回的是一个包含一组边(源顶点ID,目的顶点ID,边的属性)的EdgeRDD。代码如下:
(3)Graph的triplets方法:Graph.triplets返回的是一个RDD[EdgeTriplet[VD,ED]]该RDD内部包含了一组合并了边(源顶点ID,目的顶点ID,边的属性)、源顶点属性和目的顶点属性的EdgeTriplet[VD,ED]]。代码如下:
GraphOps类提供了一系列图操作的扩展方法,主要有以下几种。
(1)GraphOps的numEdges方法:GraphOps.numEdges返回图中边的数量。代码如下
(2)GraphOps的numVertices方法:GraphOps.numVertices返回的是图中顶点的数量。代码如下:
(3)GraphOps的degrees方法:GraphOps.degrees返回的是一个包含图中每个顶点的出入度之和的VertexRDD,其中孤立的顶点不会出现在结果之中。代码如下:
(4)GraphOps的inDegrees方法:GraphOps.inDegrees返回的是一个包含图中每个顶点的入度的VertexRDD,其中无入度的顶点不会出现在结果之中。代码如下:
(5)GraphOps的outDegrees方法:GraphOps.outDegrees返回的是一个包含图中每个顶点的出度的VertexRDD,其中无出度的顶点不会出现在结果之中。代码如下:
3.Spark GraphX的转换操作(www.xing528.com)
Spark GraphX的转换操作包括顶点属性转换、边属性转换等操作,主要在Graph和Gra-phOps两个类中提供。
Graph类提供的转换方法主要有以下几种。
(1)Graph的mapVertices()方法:Graph的mapVertices()方法用于对图中每个顶点的属性(VD)进行转换,生成新的顶点属性(VD2),这样也就生成了一个新的图。代码如下:
(2)Graph的mapEdges()方法:Graph的mapEdges()方法用于对图中每条边的属性(ED)进行转换,生成新的边属性(ED2),这样也就生成了一个新的图。代码如下:
(3)Graph的mapTriplets()方法:Graph的mapTriplets()方法通过使用它的参数map函数来作用于图中边的属性(ED)、边的源顶点属性(VD)和目的顶点属性(VD),转换得到ED2,进而生成新的图Graph[VD,ED2]。代码如下:
例如在初始化边的属性时,如果边的属性是由边的源顶点属性和目的顶点属性计算结果决定的,我们就可以使用以下伪代码进行转换:
在这里map函数是edge=>edge.src.data-edge.dst.data,也就是说边的属性值是边的源顶点属性值减去目的顶点属性值后得到的。
(4)根据分区策略对图中的边进行分区,其中参数partitionStrategy指的是要选择的分区策略,目前有EdgePartition2D、EdgePartition1D、RandomVeertexCut和CanonicalRandomVer-texCut这四种分区(分割)策略。代码如下:
GraphOps类提供的转换操作如下。
GraphOps类提供了filter()方法对图进行过滤操作。在方法中,首先会使用preprocess(预处理函数)对图的顶点属性(VD)和边的属性(ED2)进行转换操作,生成新的顶点属性(VD2)和边的属性(ED2),然后分别在新生成的边属性和顶点属性上使用epred函数和vpred函数进行过滤操作,最后返回一个原图过滤后的子图。代码如下:
例如我们在一个图中移除出度(outdegree)为0的顶点,可以使用以下伪代码来实现:
经过这个过滤操作后,返回的是一个由出度大于0的顶点构成的图。
4.Spark Graph的结构操作
Spark Graph的结构操作包括修改属性图的边的方向、获取属性图子图、过滤属性图等变更属性图结构的方法,主要有以下几种。
(1)Graph类中的reverse()方法:Graph类中的reverse()方法用来反转图中所有边的方向,例如图中某条边的源顶点是A,目的顶点是B,反转后该边的源顶点是B,目的顶点是A。代码如下:
(2)Graph类中的subgraph()方法:Graph的subgraph()方法,通过对图中的边和顶点分别使用epred函数和vpred函数来生成一个子图。代码如下:
其中各个参数的含义如下所示。
●参数epred函数会对图的EdgeTriplet[VD,ED]中的边数据(源顶点ID、目的顶点ID,边属性)和顶点属性进行过滤。
●参数vpred函数是对图的顶点属性进行过滤。
(3)Graph的mask()方法:Graph的mask()方法用来在图中过滤出在other图中也存在的顶点和边,返回一个新的图,也就是说在新的图中保留的是原图在other图中包含的顶点和边。这里特别需要注意的一点是,对于新图中的顶点属性和边的属性保留的是原图中的值。代码如下:
(4)Graph的groupEdges()方法:Graph的groupEdges()方法用来将图中两个顶点之间的多条边合并成一条,为了保证结果的正确性,图必须首先使用Grpah的partitionBy()方法进行分区操作。其中参数merge函数除了用来合并两个顶点之间的多条边为一条,还使用交换律和结合律来合并重复边的属性。代码如下:
5.Spark GraphX的连接操作
Spark GraphX的连接操作主要在Graph类中提供,包括joinVertices()方法和outerJoinVe-rtices()方法。
(1)Graph类提供的joinVertices()方法:在GraphOps类中定义了joinVertices()方法,使用该方法可以将图中的顶点与输入的RDD[(VertexId,U)]根据顶点ID的值进行连接操作,并过滤掉图中存在但输入的RDD中不存在的顶点,然后使用mapFunc函数作用于连接好的数据项,产生新的顶点数据,进而生成一个新的图。这里需要注意的一点是,如果被连接的RDD[(VertexId,U)]不包含原图中某些顶点的需要更新的数据,那么在新图中就会使用原图中的旧数据。代码如下:
这个方法常用来通过外部文件提供的数据来更新原图中的顶点的顶点数据,例如我们先加载一个外部文件构成一个图,然后求出包含了图中所有顶点出度的VertexRDD,最后对图使用joinVertices()方法进行连接操作并更新图中的顶点数据,我们可以用以下伪代码来实现:
经过上述代码的操作,生成了一个更新过顶点数据的新图。
(2)Graph类提供的outerJoinVertices()方法:在GraphOps类中定义了outerJoinVertices()方法,它的作用类似于GraphOps中的joinVertices()方法,也是对图中的顶点数据和外部的RDD[(VertexId,U)]进行连接操作,然后使用mapFunc函数作用于连接好的数据项,产生新的顶点数据,进而生成一个新的图。不同的一点是输入的RDD[(VertexId,U)]应包含图中所有顶点的VertexId,如果不满足,则mapFunc函数的输入为None。代码如下:
我们同样可以使用以下伪代码来实现:
这里的mapFunc函数是(vid,data,optDeg)=>optDeg.getOrElse(0),也就是说如果输入的RDD[(VertexId,U)]不存在vid(图中的顶点ID)时,optDeg接受到的输入值是None。
6.Spark GraphX的聚合操作
SparkGraphX的聚合操作主要在Graph和GraphOps两个类中提供。
(1)GraphOps类的collectNeighbors()方法:GraphOps类的collectNeighbors()方法用来收集每个顶点的相邻顶点的数据,返回的是一系列Array[(VertexId,VD)]构成的VertexRDD。代码如下:
其中:
●Array[(VertexId,VD)]是由每个顶点的相邻顶点ID和相邻顶点属性组成的数组。需要注意的是,当图中某个顶点的出入度较大时,在单一位置会占用很大的存储空间。
●参数edgeDirection指的是控制收集方向,比如要收集的是顶点的目的顶点数据还是顶点的源顶点数据。
(2)GraphOps类的collectNeighborIds()方法:GraphOps类的collectNeighborIds()方法用于收集每个相邻顶点的ID数据,返回的是一系列Array[VertexId]构成的VertexRDD。代码如下:
其中:
●Array[VertexId]是由每个顶点的相邻顶点ID组成的数组。
●参数edgeDirection同样指的是控制收集方向。
(3)Graph类提供的聚合操作。mapReduceTriplets()方法,这是GraphX中最核心和强大的一个接口。该方法将用户定义的mapFunc函数作为输入参数,并将mapFunc函数作用于图中的每个EdgeTriplet[VD,ED],生成一个或者多个消息,消息以EdgeTriplet[VD,ED]关联的两个顶点中的任意一个或两个为目的顶点(也就是说支持消息发往EdgeTriplet[VD,ED]的源顶点或目的顶点)。接着会使用用户定义的reduceFunc函数将发送到同一个目的顶点的消息进行合并,生成发送给该顶点的消息。最后返回的是一个VertexRDD[A],它包含所有以每个顶点聚合后的消息(类型为A),没有收到消息的顶点不包含在返回的Ver-texRDD。
需要注意的是,mapReduceTriplets()方法需要一个附加的可选参数activeSetOptOption,它通过选取一些活跃顶点来对图中的部分顶点进行操作(也就是说mapFunc函数仅仅作用于那些在活跃顶点集合中的顶点),同时参数activeSetOptOption中的EdgeDirection指定了mapFunc函数计算时收集的方向。如果EdgeDirection指定的方向是in,则用户定义的map-Func函数将仅仅作用于目的顶点在活跃顶点集中的EdgeTriplet[VD,ED];如果方向是out,则该mapFunc函数将仅仅作用在那些源顶点在活跃顶点集中的EdgeTriplet[VD,ED];如果方向是either,则mapFunc函数将仅作用于源顶点或目的顶点在活跃顶点集中的EdgeTriplet[VD,ED];如果方向是both,则mapFunc函数将作用于两个顶点都在活跃顶点集中的Edge-Triplet[VD,ED]。同时要注意活跃顶点集合中的顶点必须来自图的顶点。
代码如下:
7.Spark GraphX的缓存操作
Spark GraphX的缓存操作由Graph类提供,主要有以下几种。
(1)Graph类的cache()方法:cache()方法用来缓存图中的顶点和边的数据,是persist()的默认缓存策略的快捷方法,即对应的缓存策略是StorageLevel.MEMORY_ONLY。代码如下:
(2)Graph类的persist()方法:persist()用来持久化图的顶点和边的数据,默认的持久化方式是StorageLevel.MEMORY_ONLY,用户可以通过传递适合自己的持久化方式(缓存策略)来存取数据。代码如下:
(3)Graph类的unpersist()方法:unpersist()方法用来取消图的顶点和边的数据的持久化(缓存)。代码如下:
(4)Graph类的unpersistVertices()方法:使用unpersistVertices()方法后将不再持久化(缓存)顶点的数据,但是边的数据会保留。代码如下:
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。