正如RDD有基本的操作map、filter和reduceByKey一样,属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。定义在Graph中的核心操作是经过优化后的实现。核心操作组合的便捷操作定义在GraphOps中。然而,因为有Scala的隐式转换,定义在GraphOps中的操作可以作为Graph的成员自动使用。例如可以通过下面的方式计算每个顶点(定义在GraphOps中)的入度,代码如下。
val graph:Graph[(String,String),String]
//使用隐式GraphOps.inDegrees操作
val inDegrees:VertexRDD[Int]=graph.inDegrees
区分核心图操作和GraphOps的原因是为了在将来支持不同的图表示。每个图表示都必须提供核心操作的实现并重用很多定义在GraphOps中的有用操作。
以下是定义在Graph和GraphOps中(为了简单起见,表现为图的成员)的功能的快速浏览。注意,某些函数签名已经简化,例如,默认参数和类型的限制已删除,一些更高级的功能已经被省略,所以请参阅API文档了解更加详细的操作列表,【例4-19】中列出的是操作总览。
【例4-19】运算列表汇总。
下面为对【例4-19】中包含的操作进行划分介绍。
1.属性操作
如RDD的map操作一样,属性图包含如下的操作。
下面为对【例4-19】中包含的操作进行划分介绍。
1.属性操作
如RDD的map操作一样,属性图包含如下的操作。
每个操作都产生一个新图,这个新图包含通过用户自定义的map操作、得到修改后的顶点或边的属性。
注意,每种情况下图结构都不受影响。这些操作的一个重要特征是它允许所得图形重用原有图形的结构索引(indices)。下面的两行代码在逻辑上是等价的,但是第一行不保存结构索引,所以不会有GraphX系统优化的特性。
val newVertices=graph.vertices.map{case(id,attr)=>(id,mapUdf(id,attr))}
val newGraph=Graph(newVertices,graph.edges)
另一种方法是用mapVertices⇒VD2)(ClassTag[VD2]):Graph[VD2,ED])保存索引,代码如下。
val newGraph=graph.mapVertices((id,attr)=>mapUdf(id,attr))
这些操作经常用来初始化的图形,这将用作特定计算或者用来处理项目不需要的属性。例如,给定一个图,这个图的顶点特征包含出度,以PageRank算法作为示例。
【例4-20】PageRank算法示例。
每个操作都产生一个新图,这个新图包含通过用户自定义的map操作、得到修改后的顶点或边的属性。
注意,每种情况下图结构都不受影响。这些操作的一个重要特征是它允许所得图形重用原有图形的结构索引(indices)。下面的两行代码在逻辑上是等价的,但是第一行不保存结构索引,所以不会有GraphX系统优化的特性。
val newVertices=graph.vertices.map{case(id,attr)=>(id,mapUdf(id,attr))}
val newGraph=Graph(newVertices,graph.edges)
另一种方法是用mapVertices⇒VD2)(ClassTag[VD2]):Graph[VD2,ED])保存索引,代码如下。
val newGraph=graph.mapVertices((id,attr)=>mapUdf(id,attr))
这些操作经常用来初始化的图形,这将用作特定计算或者用来处理项目不需要的属性。例如,给定一个图,这个图的顶点特征包含出度,以PageRank算法作为示例。
【例4-20】PageRank算法示例。
2.结构操作
当前的GraphX仅仅支持一组简单常用的结构性操作。【例4-21】所示代码是基本的结构性操作列表。
【例4-21】基本的结构性操作列表。
2.结构操作
当前的GraphX仅仅支持一组简单常用的结构性操作。【例4-21】所示代码是基本的结构性操作列表。
【例4-21】基本的结构性操作列表。
reverse操作返回一个新图,这个图的边的方向都是反转的。例如,这个操作可以用来计算反转的PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以可以在不移动或者复制数据的情况下有效地实现它。
subgraph⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED])操作利用顶点和边的谓词(predicates),返回的图仅仅包含满足顶点谓词的顶点、满足边谓词的边以及满足顶点谓词的连接顶点(connect vertices)。subgraph操作可以用于很多场景。【例4-22】为删除断开的连接的代码示例。
【例4-22】删除断开的连接的示例。
reverse操作返回一个新图,这个图的边的方向都是反转的。例如,这个操作可以用来计算反转的PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以可以在不移动或者复制数据的情况下有效地实现它。
subgraph⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED])操作利用顶点和边的谓词(predicates),返回的图仅仅包含满足顶点谓词的顶点、满足边谓词的边以及满足顶点谓词的连接顶点(connect vertices)。subgraph操作可以用于很多场景。【例4-22】为删除断开的连接的代码示例。
【例4-22】删除断开的连接的示例。
注意,上面的例子中,仅仅提供了顶点谓词。如果没有提供顶点或者边的谓词,subgraph操作默认为true。
mask(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED])操作构造一个子图,这个子图包含输入图中包含的顶点和边。这个操作可以和subgraph操作相结合,基于另外一个相关图的特征去约束一个图。例如可以利用缺失顶点的图运行连通组件(Connected Components),然 后返回有效的子图,代码如下。
//运行连通组件
val ccGraph=graph.connectedComponents()//No longer contains missing field
//移除失去联系的点和边
val validGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")
//限制有效子图的结果
val validCCGraph=ccGraph.mask(validGraph)
groupEdges⇒ED):Graph[VD,ED])操作合并多重图中的并行边(例如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(将它们的权重合并)为一条边,从而降低图的大小。
3.连接操作
在许多情况下,有必要将外部数据加入到图中。例如,可能有额外的用户属性需要合并到已有的图中或者可能从一个图中取出顶点特征加入到另外一个图中,这些任务可以用join操作完成。下面【例4-23】列出的是主要的join操作。
【例4-23】join操作示例。
注意,上面的例子中,仅仅提供了顶点谓词。如果没有提供顶点或者边的谓词,subgraph操作默认为true。
mask(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED])操作构造一个子图,这个子图包含输入图中包含的顶点和边。这个操作可以和subgraph操作相结合,基于另外一个相关图的特征去约束一个图。例如可以利用缺失顶点的图运行连通组件(Connected Components),然 后返回有效的子图,代码如下。
//运行连通组件
val ccGraph=graph.connectedComponents()//No longer contains missing field
//移除失去联系的点和边
val validGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="Missing")
//限制有效子图的结果
val validCCGraph=ccGraph.mask(validGraph)
groupEdges⇒ED):Graph[VD,ED])操作合并多重图中的并行边(例如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(将它们的权重合并)为一条边,从而降低图的大小。
3.连接操作(www.xing528.com)
在许多情况下,有必要将外部数据加入到图中。例如,可能有额外的用户属性需要合并到已有的图中或者可能从一个图中取出顶点特征加入到另外一个图中,这些任务可以用join操作完成。下面【例4-23】列出的是主要的join操作。
【例4-23】join操作示例。
其中joinVertices[U])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED])操作将输入RDD和顶点相结合,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户定义的map函数获得的。在RDD中没有匹配到值的顶点能保留其原始值。
注意,对于给定的顶点,如果RDD中有超过一个的匹配值,则仅仅会使用其中的一个。建议用下面的方法保证输入RDD的唯一性。下面的方法也会预索引(pre-index)返回的值用以加快后续的join操作。
其中joinVertices[U])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED])操作将输入RDD和顶点相结合,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户定义的map函数获得的。在RDD中没有匹配到值的顶点能保留其原始值。
注意,对于给定的顶点,如果RDD中有超过一个的匹配值,则仅仅会使用其中的一个。建议用下面的方法保证输入RDD的唯一性。下面的方法也会预索引(pre-index)返回的值用以加快后续的join操作。
除了将用户自定义的map函数用到所有顶点和改变顶点属性类型以外,更一般的outerJoinVertices[U,VD2])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]): Graph[VD2,ED])操作与joinVertices类似。因为并不是所有顶点在RDD中都能拥有匹配的值,map函数需要一个option类型。
在上面的例子中用到了柯里化函数的多参数列表。虽然可以将f(a)(b)写成f(a,b),但是f(a,b)意味着b的类型推断将不会依赖于a。因此,用户需要为定义的函数提供类型标注。
lassTag[U],ClassTag[VD2]): Graph[VD2,ED])操作与joinVertices类似。因为并不是所有顶点在RDD中都能拥有匹配的值,map函数需要一个option类型。在上面的例子中用到了柯里化函数的多参数列表。虽然可以将f(a)(b)写成f(a,b),但是f(a,b)意味着b的类型推断将不会依赖于a。因此,用户需要为定义的函数提供类型标注。
【例4-24】更一般的join操作及类型标注示例。
【例4-24】更一般的join操作及类型标注示例。
在上面的例子中用到了柯里化函数的多参列表。虽然可以将f(a)(b)写成f(a,b),但是f(a,b)意味着b的类型推断将不会依赖于a。因此,用户需要为定义的函数提供类型标注。
4.近邻聚合
图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如当要对用户的追随者的数量或者追随者的信息进行分析时,可以使用迭代算法获得与用户为追随关系的相邻用户信息,分析并得到结果。
许多迭代的图算法(如PageRank,最短路径和连通体算法)主要通过多次聚合相邻顶点的属性进行计算。为了提高性能,主要的聚合操作从graph.mapReduceTriplets改为了新的graph.Aggregate Messages。下面将会介绍如何利用官方提供的API进行相邻聚合操作。虽然API的改变相对较小,但是官方仍然提供了API过渡指南。
(1)聚合信息
Spark中的核心聚合操作是aggregateMessages,这个操作符适用于用户在图中对每一个edge triplet定义sendMsg函数,然后使用mergeMsg函数在目的地顶点聚集这些消息。
【例4-25】聚合操作示例。
在上面的例子中用到了柯里化函数的多参列表。虽然可以将f(a)(b)写成f(a,b),但是f(a,b)意味着b的类型推断将不会依赖于a。因此,用户需要为定义的函数提供类型标注。
4.近邻聚合
图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如当要对用户的追随者的数量或者追随者的信息进行分析时,可以使用迭代算法获得与用户为追随关系的相邻用户信息,分析并得到结果。
许多迭代的图算法(如PageRank,最短路径和连通体算法)主要通过多次聚合相邻顶点的属性进行计算。为了提高性能,主要的聚合操作从graph.mapReduceTriplets改为了新的graph.Aggregate Messages。下面将会介绍如何利用官方提供的API进行相邻聚合操作。虽然API的改变相对较小,但是官方仍然提供了API过渡指南。
(1)聚合信息
Spark中的核心聚合操作是aggregateMessages,这个操作符适用于用户在图中对每一个edge triplet定义sendMsg函数,然后使用mergeMsg函数在目的地顶点聚集这些消息。
【例4-25】聚合操作示例。
用户定义sendMsg函数接收一个EdgeContext,EdgeContext中包含了源和目标顶点的属性、边的属性以及函数(sendToSrc和sendToDst),从而可以向消息源和目标属性发送消息。可以将sendMsg看作map-reduce中的map函数,用户定义mergeMsg函数,将两个不同的消息合并为一个消息。可以将mergeMsg看作map-reduce中的reduce函数,aggregateMessages操作符返回包含每个顶点的聚合消息的VertexRDD[Msg]。没有收到一条消息的顶点不包括在返回的VertexRDD中。
此外,aggregateMessages需要一个可选的tripletsFields显示在EdgeContext中,说明访问什么数据(即源顶点属性而不是目的地顶点属性)。tripletsFields可以从TripletsFields的定义中选择值,默认值是TripletsFields。sendMsg函数定义意味着用户可以访问任何EdgeContext字段。tripletFields参数可以用来通知GraphX,只有部分EdgeContext需要GraphX选择一个优化的连接策略。例如计算每个用户的追随者的平均年龄,只需要源字段,所以会使用TripletFields。需要注意的是,在GraphX的早期版本中使用字节代码检查来推断TripletFields,然而我们发现字节码检查不可靠,于是选择了更明确的用户控件。【例4-26】,使用aggregateMessages操作符来计算比用户年纪更大的追随者的平均年龄。
【例4-26】计算追随者平均年龄的代码示例。
用户定义sendMsg函数接收一个EdgeContext,EdgeContext中包含了源和目标顶点的属性、边的属性以及函数(sendToSrc和sendToDst),从而可以向消息源和目标属性发送消息。可以将sendMsg看作map-reduce中的map函数,用户定义mergeMsg函数,将两个不同的消息合并为一个消息。可以将mergeMsg看作map-reduce中的reduce函数,aggregateMessages操作符返回包含每个顶点的聚合消息的VertexRDD[Msg]。没有收到一条消息的顶点不包括在返回的VertexRDD中。
此外,aggregateMessages需要一个可选的tripletsFields显示在EdgeContext中,说明访问什么数据(即源顶点属性而不是目的地顶点属性)。tripletsFields可以从TripletsFields的定义中选择值,默认值是TripletsFields。sendMsg函数定义意味着用户可以访问任何EdgeContext字段。tripletFields参数可以用来通知GraphX,只有部分EdgeContext需要GraphX选择一个优化的连接策略。例如计算每个用户的追随者的平均年龄,只需要源字段,所以会使用TripletFields。需要注意的是,在GraphX的早期版本中使用字节代码检查来推断TripletFields,然而我们发现字节码检查不可靠,于是选择了更明确的用户控件。【例4-26】,使用aggregateMessages操作符来计算比用户年纪更大的追随者的平均年龄。
【例4-26】计算追随者平均年龄的代码示例。
(2)Map Reduce三元组过度指南
在之前版本的GraphX中,利用mapReduceTriplets操作完成相邻聚合,代码如下。
(2)Map Reduce三元组过度指南
在之前版本的GraphX中,利用mapReduceTriplets操作完成相邻聚合,代码如下。
mapReduceTriplets操作在每个三元组上应用用户定义的map函数,然后保存用户定义的reduce函数聚合的消息。然而,发现通过用户返回的迭代器有较大不足,它抑制了添加额外优化(例如本地顶点的重新编号)的能力。aggregateMessages⇒Unit,(A,A)⇒A,TripletFields) (ClassTag[A]):VertexRDD[A])暴露三元组字段和函数显示的发送消息到源和目的顶点。并且,删除了字节码检测转而需要用户指明三元组的哪些字段实际需要。
下面将介绍如何在代码中应用mapReduceTriplets和aggregateMessages。
代码用到了mapReduceTriplets:
mapReduceTriplets操作在每个三元组上应用用户定义的map函数,然后保存用户定义的reduce函数聚合的消息。然而,发现通过用户返回的迭代器有较大不足,它抑制了添加额外优化(例如本地顶点的重新编号)的能力。aggregateMessages⇒Unit,(A,A)⇒A,TripletFields) (ClassTag[A]):VertexRDD[A])暴露三元组字段和函数显示的发送消息到源和目的顶点。并且,删除了字节码检测转而需要用户指明三元组的哪些字段实际需要。
下面将介绍如何在代码中应用mapReduceTriplets和aggregateMessages。
代码用到了mapReduceTriplets:
代码用到了aggregateMessages:
代码用到了aggregateMessages:
(3)计算度信息
一般的聚合任务就是计算顶点的度,即每个顶点相邻边的数量。在有向图中,经常需要知道顶点的入度、出度以及总度。GraphOps类包含一个操作集合用来计算每个顶点的度。例如,【例4-27】为计算最大的入度、出度和总度的代码示例。
【例4-27】计算度信息的代码示例。
(3)计算度信息
一般的聚合任务就是计算顶点的度,即每个顶点相邻边的数量。在有向图中,经常需要知道顶点的入度、出度以及总度。GraphOps类包含一个操作集合用来计算每个顶点的度。例如,【例4-27】为计算最大的入度、出度和总度的代码示例。
【例4-27】计算度信息的代码示例。
(4)近邻收集
在某些情况下,通过收集每个顶点相邻的顶点及它们的属性来表达计算可能更容易。这可以通过collectNeighborIds:VertexRDD[Array[VertexId]])和collectNeighbors:VertexRDD [Array[(VertexId,VD)]])操作来简单地完成。代码如下。
(4)近邻收集
在某些情况下,通过收集每个顶点相邻的顶点及它们的属性来表达计算可能更容易。这可以通过collectNeighborIds:VertexRDD[Array[VertexId]])和collectNeighbors:VertexRDD [Array[(VertexId,VD)]])操作来简单地完成。代码如下。
这些操作的代价是相当昂贵的,因为操作过程中需要大量而重复的通信。如果可能,尽量用aggregateMessages操作直接表达相同的计算。
5.缓存
在Spark中,RDD默认是不缓存的。为了避免重复计算,当需要多次利用它们时,则必须显示地缓存它们,GraphX中的图也为相同的方式。当多次利用到图时,应确保首先访问Graph.cache()方法。
在迭代计算中,为了获得最佳的性能,不建议进行缓存。默认情况下,缓存的RDD和图会一直保留在内存中,直到因为内存压力迫使它们以LRU的顺序删除。对于迭代计算,先前的迭代的中间结果将填充到缓存中。虽然它们最终会被删除,但是保存在内存中的不需要的数据将会减慢垃圾回收。只有中间结果不需要,不缓存它们更高效。这涉及在每次迭代中物化(“物化”表示把RDD存磁盘中)一个图或者RDD而不缓存所有其他的数据集。在将来的迭代中仅用物化的数据集。然而,因为图是由多个RDD组成的,要正确的不缓存它们比较困难,故对于迭代计算,建议使用Pregel API,它可以正确的不持久化中间结果。
这些操作的代价是相当昂贵的,因为操作过程中需要大量而重复的通信。如果可能,尽量用aggregateMessages操作直接表达相同的计算。
5.缓存
在Spark中,RDD默认是不缓存的。为了避免重复计算,当需要多次利用它们时,则必须显示地缓存它们,GraphX中的图也为相同的方式。当多次利用到图时,应确保首先访问Graph.cache()方法。
在迭代计算中,为了获得最佳的性能,不建议进行缓存。默认情况下,缓存的RDD和图会一直保留在内存中,直到因为内存压力迫使它们以LRU的顺序删除。对于迭代计算,先前的迭代的中间结果将填充到缓存中。虽然它们最终会被删除,但是保存在内存中的不需要的数据将会减慢垃圾回收。只有中间结果不需要,不缓存它们更高效。这涉及在每次迭代中物化(“物化”表示把RDD存磁盘中)一个图或者RDD而不缓存所有其他的数据集。在将来的迭代中仅用物化的数据集。然而,因为图是由多个RDD组成的,要正确的不缓存它们比较困难,故对于迭代计算,建议使用Pregel API,它可以正确的不持久化中间结果。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。