关于GraphX和Pregel的相关介绍,可以见这篇文章( http://www.jianshu.com/p/6f8704bceca9 ),这里不再多做介绍,此文讲GraphX 中 Pregel 源码,以当前最新2.1.1版本Spark为例,Pregel 的Github 源码见于此(https://github.com/apache/spark/blob/v2.1.1/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala#L2)
package org.apache.spark.graphx
import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
/**
* Implements a Pregel-like bulk-synchronous message-passing API.
* --实现了类似Pregel的批量同步消息传递API
*
* Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over
* edges, enables the message sending computation to read both vertex attributes, and constrains
* messages to the graph structure. These changes allow for substantially more efficient
* distributed execution while also exposing greater flexibility for graph-based computation.
* ----与原始的Pregel API不同,GraphX Pregel API会通过边影响sendMessage计算,使
* sendMessage计算能够读取顶点属性,并将消息约束(constrains)到图形结构。 这些更改允许基
* 本上更有效的分布式执行,同时也为基于图的计算提供了更大的灵活性。
*
* @example We can use the Pregel abstraction to implement PageRank:
* ----使用Pregel抽象来实现PageRank的一个例子
* {{{
* val pagerankGraph: Graph[Double, Double] = graph
* // Associate the degree with each vertex ----将度数与每个顶点相关联
* .outerJoinVertices(graph.outDegrees) {
* (vid, vdata, deg) => deg.getOrElse(0)
* }
* // Set the weight on the edges based on the degree ---根据出度设置边的weight
* .mapTriplets(e => 1.0 / e.srcAttr)
* // Set the vertex attributes to the initial pagerank values---将顶点属性设置为初始pagerank值
* .mapVertices((id, attr) => 1.0)
*
* def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
* def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
* // Execute Pregel for a fixed number of iterations.---执行Pregel进行固定次数的迭代。
* Pregel(pagerankGraph, initialMessage, numIter)(
* vertexProgram, sendMessage, messageCombiner)
* }}}
*
*/
object Pregel extends Logging {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
* --- 一共三个函数:
* 用户定义的顶点函数vprog在接收任何入点消息的每个顶点上并行执行,
* 并计算顶点的新值;
* 在所有出方向的边上执行sendMsg函数,并用于计算到目标顶点的可选消息;
* mergeMsg是用于组合【发往同一个顶点的消息】的交换关联函数。
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
* --在第一次迭代中,所有的顶点都接收intialMsg消息,在后续迭代中,如果顶点没有接收到消
* 息,vprog将不会被执行
*
* This function iterates until there are no remaining messages, or
* for `maxIterations` iterations. ---函数将循环迭代,直到没有剩余消息或者到达设定的最大迭代次数
*
* @tparam VD the vertex data type ---点数据类型
* @tparam ED the edge data type ---边数据类型
* @tparam A the Pregel message type ---Pregel消息类型
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the first
* iteration --首轮初始Msg
*
* @param maxIterations the maximum number of iterations to run for --设定的最大迭代次数
*
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run. The default is
* `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message
* in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where
* *both* vertices received a message.
* ---上一轮接收到消息的顶点所关联的边,将沿着边的方向执行sendMsg
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* ---用户定义的顶点程序,其在每个顶点上运行并接收入站消息并计算新的顶点值。
* 在第一次迭代中,顶点程序在所有顶点上被调用并被传递给默认消息。
* 在后续迭代中,顶点程序仅在接收消息的顶点上被调用。
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration
* ---用户定义的函数,应用于当前迭代中接收到消息的定点所关联的out方向的边
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
* ---一个用户提供的函数,它接收两个类型为A的输入消息,并将它们合并成一个类型为A的单个
* 消息。“该函数必须是可交换和关联的,理想情况下A的大小不应该增加" ,我理解成消息
* 总是被合并的意思。
*
* @return the resulting graph at the end of the computation ---计算结束返回的结果
*
*/
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")
---require() 方法用在对参数的检验上,不通过则抛出 IllegalArgumentException
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices. 接收消息更新节点信息
prevG = g
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration. 发送新消息,跳过未接收到消息的边,必须cache消息以便在下一次循环使用
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()
--count()方法的调用实质化(执行了)messages和'g'图的点,隐藏了老的(上一轮的)Message和点
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs --对上一轮的消息和图反持久化
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply
} // end of class Pregel