-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing an Improved Pregel API #1217
Conversation
@ankurdave unfortunately to full accept this change we will need to break compatibility with the current Pregel API. I cannot seem to overload the apply method. |
I spent some time verifying the math behind the PageRank (in particular starting values) to ensure that the delta formulation behaves identically to the static formulation which matches other reference implementations of PageRank. One of the key changes is I have added an extra normalization step at the end of the calculation to address a discrepancy in how we handle dangling vertices. |
|
||
// Unpersist the RDDs hidden by newly-materialized RDDs | ||
// prevG.unpersistVertices(blocking=false) | ||
// prevG.edges.unpersist(blocking=false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncommenting lines 279 and 280 leads to a substantial slow down in later iterations indicating that there is still an issue with unpersist
. @ankurdave any thoughts?
@ankurdave and @rxin there is an issue with the current API. The def sendMessageWrapper(triplet: EdgeTriplet[(VD, Boolean),ED]): Iterator[(VertexId, A)] = {
val simpleTriplet = new EdgeTriplet[VD, ED]()
simpleTriplet.set(triplet)
simpleTriplet.srcAttr = triplet.srcAttr._1
simpleTriplet.dstAttr = triplet.dstAttr._1
val ctx = new EdgeContext(i, triplet.srcAttr._2, triplet.dstAttr._2)
sendMsg(simpleTriplet, ctx)
}
// Compute the messages for all the active vertices
val messages = g.mapReduceTriplets(sendMessageWrapper, mergeMsg, Some((activeVertices, activeDirection))) thereby allowing the user a simple sendMsg: (EdgeTriplet[VD, ED], EdgeContext) => Iterator[(VertexId, A)] However because we access the source and destination vertex attributes the byte code inspection will force a full 3-way join even if the user doesn't actually read the fields. The simplest solution would be to change the send message interface to operate on the extended vertex attribute (containing the active field). sendMsg: (EdgeTriplet[(VD, Boolean), ED], EdgeContext) => Iterator[(VertexId, A)] |
* vertex properties in terms of the properties of neighboring vertices. These | ||
* recursive properties are then computed through iterative fixed-point | ||
* computations. For example, the PageRank of a web-page can be defined as a | ||
* weighted some of the PageRank of web-pages that link to that page and is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"weighted sum" :)
QA tests have started for PR 1217 at commit
|
QA tests have finished for PR 1217 at commit
|
Can one of the admins verify this patch? |
…roperty. This addresses the issue where byte-code inspection would always force a 3-way join.
2116e8a
to
e1d8033
Compare
This is still work in progress and we need to discuss these API changes. |
@ankurdave is this already covered in your latest PR? |
@ankurdave should I try and update this with your latest changes or do you want to create a new one? |
@jegonzal I'm going to look at this over the weekend to try to get it into 1.2. |
@jegonzal @ankurdave - This PR has gone stale. Do we want to update it or close it for later? |
Let's close this issue for later. |
(For Github close script:) Do you mind closing this PR? |
The initial Pregel API coupled voting to halt with message reception. In this revised the vertex program receives a
PregelContext
which enables the user to signal whether or not to halt as well as access the current iteration number.