-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9021][PySpark] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold() #7378
Conversation
…tions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior).
Hi @njhwang, Thanks for submitting this pull request. Do you mind filing a JIRA ticket at https://issues.apache.org/jira/browse/SPARK to report this issue? Check out https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark for an overview of how the Spark contribution process works. I agree that we should try to guard against mutation of the zero original zero value. I'd also look into whether we should be using a copy on line 904 in It would be good to add a regression test for this. Take a look at |
JIRA ticket here: https://issues.apache.org/jira/browse/SPARK-9021 Will look into regression tests and other contribution requirements. |
I'll note that I did consider adding more copies on line 904 as you mention, as well as within |
Ah, cool. I suspected that you might have looked into the case on line 904, but just wanted to confirm. Wouldn't hurt to add a line comment there to explain this :) |
…DD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c.
…DD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range.
Okay, think that does it; let me know if the tests seem sufficient (or overkill). Should mention I took the liberty of replacing a few calls to |
Jenkins this is ok to test |
Test build #37336 has finished for PR 7378 at commit
|
Did not realize |
…patibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication.
Test build #37359 has finished for PR 7378 at commit
|
Test build #37364 has finished for PR 7378 at commit
|
…e list comprehensions to make dicts)
Test build #37369 has finished for PR 7378 at commit
|
Hooray. Sorry for the churn, guess I should have a Python 2.6 environment to do backwards compatibility tests :). |
I'm actually going to kick off another build now since we just merged a change to the Python linter and I want to avoid a committing race that leads to a build break. |
Test build #1069 has finished for PR 7378 at commit
|
Jenkins, retest this please. |
Test build #37373 has finished for PR 7378 at commit
|
@davies, this would be another good PR for you to review, esp. now that it's passing tests. |
@njhwang @JoshRosen The reason we need to do deepcopy here is that the two mapPartition() will be combined together as a single task. When the closure is serialized by pickle, the zeroValue will be memorized by Pickler, and deserialized as single object for two mapParition(). It's weird to call fold() inside aggregate(), because it does not need to run
Then we don't need to do the deep copy (deep copy may fail in some cases, for example, object from C extension). |
@davies That makes sense. I just confirmed this works (and it's exactly what you do in |
…artitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results.
Test build #37424 has finished for PR 7378 at commit
|
LGTM, could you update the title and description? |
Done! |
Thanks, merging into master and branch 1.4! |
…ons()) instead of mapPartitions.fold() I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used. Currently, if one happens to use a mutable object as `zeroValue` for `RDD.aggregate()`, possibly unexpected behavior can occur. This is because pyspark's current implementation of `RDD.aggregate()` does not serialize or make a copy of `zeroValue` before handing it off to `RDD.mapPartitions(...).fold(...)`. This results in a single reference to `zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each partition. This can result in strange accumulator values being fed into each partition's call to `RDD.fold()`, as the `zeroValue` may have been changed in-place during the `RDD.mapPartitions()` call. As an illustrative example, submit the following to `spark-submit`: ``` from pyspark import SparkConf, SparkContext import collections def updateCounter(acc, val): print 'update acc:', acc print 'update val:', val acc[val] += 1 return acc def comboCounter(acc1, acc2): print 'combo acc1:', acc1 print 'combo acc2:', acc2 acc1.update(acc2) return acc1 def main(): conf = SparkConf().setMaster("local").setAppName("Aggregate with Counter") sc = SparkContext(conf = conf) print '======= AGGREGATING with ONE PARTITION =======' print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), updateCounter, comboCounter) print '======= AGGREGATING with TWO PARTITIONS =======' print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), updateCounter, comboCounter) if __name__ == "__main__": main() ``` One probably expects this to output the following: ``` Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1}) ``` But it instead outputs this (regardless of the number of partitions): ``` Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2}) ``` This is because (I believe) `zeroValue` gets passed correctly to each partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object has been updated and is then passed to `RDD.fold()`, which results in all items being double-counted within each partition before being finally reduced at the calling node. I realize that this type of calculation is typically done by `RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some potentially confusing behavior. I also noticed that other `RDD` methods use this `deepcopy` approach to creating unique copies of `zeroValue` (i.e., `RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala implementations do seem to serialize the `zeroValue` object appropriately to prevent this type of behavior. Author: Nicholas Hwang <[email protected]> Closes #7378 from njhwang/master and squashes the following commits: 659bb27 [Nicholas Hwang] Fixed RDD.aggregate() to perform a reduce operation on collected mapPartitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results. 8d8d694 [Nicholas Hwang] Changed dict construction to be compatible with Python 2.6 (cannot use list comprehensions to make dicts) 56eb2ab [Nicholas Hwang] Fixed whitespace after colon to conform with PEP8 391de4a [Nicholas Hwang] Removed used of collections.Counter from RDD tests for Python 2.6 compatibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication. 2fa4e4b [Nicholas Hwang] Merge branch 'master' of https://github.com/njhwang/spark ba528bd [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range. 7820391 [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c. 90d1544 [Nicholas Hwang] Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior). (cherry picked from commit a803ac3) Signed-off-by: Davies Liu <[email protected]>
I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used.
Currently, if one happens to use a mutable object as
zeroValue
forRDD.aggregate()
, possibly unexpected behavior can occur.This is because pyspark's current implementation of
RDD.aggregate()
does not serialize or make a copy ofzeroValue
before handing it off toRDD.mapPartitions(...).fold(...)
. This results in a single reference tozeroValue
being used for bothRDD.mapPartitions()
andRDD.fold()
on each partition. This can result in strange accumulator values being fed into each partition's call toRDD.fold()
, as thezeroValue
may have been changed in-place during theRDD.mapPartitions()
call.As an illustrative example, submit the following to
spark-submit
:One probably expects this to output the following:
But it instead outputs this (regardless of the number of partitions):
This is because (I believe)
zeroValue
gets passed correctly to each partition, but afterRDD.mapPartitions()
completes, thezeroValue
object has been updated and is then passed toRDD.fold()
, which results in all items being double-counted within each partition before being finally reduced at the calling node.I realize that this type of calculation is typically done by
RDD.mapPartitions(...).reduceByKey(...)
, but hopefully this illustrates some potentially confusing behavior. I also noticed that otherRDD
methods use thisdeepcopy
approach to creating unique copies ofzeroValue
(i.e.,RDD.aggregateByKey()
andRDD.foldByKey()
), and that the Scala implementations do seem to serialize thezeroValue
object appropriately to prevent this type of behavior.