-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5062][Graphx] replace mapReduceTriplets with aggregateMessage in Pregel Api #3883
Conversation
test this please |
Test build #25028 has finished for PR 3883 at commit
|
ok to test |
Test build #25033 has finished for PR 3883 at commit
|
Test build #25041 has finished for PR 3883 at commit
|
cc @ankurdave can you take a look at this? Thanks. |
@shijinkui Thanks for the PR. Starting from 1.2 we're avoiding any breaking changes, including this one since it modifies the signature of the Pregel API. Instead it might be better to introduce a new version of the Pregel API and keep the old version (GraphOps.pregel and Pregel.run) unmodified. Naming the new version might be tricky -- the best I can think of is GraphOps.pregel2 and Pregel.run2. It would also be good to introduce any other modifications to the Pregel API (#1217, #3866) in the same PR. |
…s. Replace mapReduceTriplet with aggregateMessages
Test build #25129 has finished for PR 3883 at commit
|
Test build #25131 has finished for PR 3883 at commit
|
hi, @ankurdave , I remembered that the PR should be as soon as smaller for easy testing and reviewing. #3866 is more complex,let it independent maybe better. How do u think about? |
Test build #25132 has finished for PR 3883 at commit
|
cc @andrewor14 |
Can one of the admins verify this patch? |
val newVerts = g.vertices.innerJoin(messages)(vprog).cache() | ||
// Update the graph with the new vertices. | ||
prevG = g | ||
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that newVerts
is no longer needed as a separate variable because aggregateMessages
does not need a working set as mapReduceTriplets
does. If newVerts
is removed from the lines 215 and 235 then I think that the following should work
g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match {
case Some(mess) => vprog(vid, old, mess)
case None => old }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized that it would not work, because the active set of vertices is needed that will emit the messages during the aggregation stage. It seems that you need to use aggregateMessagesWithActiveSet
instead of aggregateMessages
.
Do you plan to write tests? |
I don't think this PR is going to proceed; do you mind closing this PR? |
since spark 1.2 introduce aggregateMessage instead of mapReduceTriplets, it improve the performance indeed.
it's time to replace mapReduceTriplets with aggregateMessage in Pregel.
i provide a deprecated method thinking about compatibility
i have draw a graph of aggregateMessage to show why it can improve the performance.
dfgdfgd