Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9021][PySpark] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold() #7378

Closed
wants to merge 8 commits into from

Conversation

shanghaiclown
Copy link
Contributor

I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used.

Currently, if one happens to use a mutable object as zeroValue for RDD.aggregate(), possibly unexpected behavior can occur.

This is because pyspark's current implementation of RDD.aggregate() does not serialize or make a copy of zeroValue before handing it off to RDD.mapPartitions(...).fold(...). This results in a single reference to zeroValue being used for both RDD.mapPartitions() and RDD.fold() on each partition. This can result in strange accumulator values being fed into each partition's call to RDD.fold(), as the zeroValue may have been changed in-place during the RDD.mapPartitions() call.

As an illustrative example, submit the following to spark-submit:

from pyspark import SparkConf, SparkContext
import collections

def updateCounter(acc, val):
    print 'update acc:', acc
    print 'update val:', val
    acc[val] += 1
    return acc

def comboCounter(acc1, acc2):
    print 'combo acc1:', acc1
    print 'combo acc2:', acc2
    acc1.update(acc2)
    return acc1

def main():
    conf = SparkConf().setMaster("local").setAppName("Aggregate with Counter")
    sc = SparkContext(conf = conf)

    print '======= AGGREGATING with ONE PARTITION ======='
    print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), updateCounter, comboCounter)

    print '======= AGGREGATING with TWO PARTITIONS ======='
    print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), updateCounter, comboCounter)

if __name__ == "__main__":
    main()

One probably expects this to output the following:

Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})

But it instead outputs this (regardless of the number of partitions):

Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2})

This is because (I believe) zeroValue gets passed correctly to each partition, but after RDD.mapPartitions() completes, the zeroValue object has been updated and is then passed to RDD.fold(), which results in all items being double-counted within each partition before being finally reduced at the calling node.

I realize that this type of calculation is typically done by RDD.mapPartitions(...).reduceByKey(...), but hopefully this illustrates some potentially confusing behavior. I also noticed that other RDD methods use this deepcopy approach to creating unique copies of zeroValue (i.e., RDD.aggregateByKey() and RDD.foldByKey()), and that the Scala implementations do seem to serialize the zeroValue object appropriately to prevent this type of behavior.

…tions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior).
@JoshRosen
Copy link
Contributor

Hi @njhwang,

Thanks for submitting this pull request. Do you mind filing a JIRA ticket at https://issues.apache.org/jira/browse/SPARK to report this issue?

Check out https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark for an overview of how the Spark contribution process works.

I agree that we should try to guard against mutation of the zero original zero value. I'd also look into whether we should be using a copy on line 904 in return self.mapPartitions(func).fold(zeroValue, combOp); there might be other cases that need to be addressed, too.

It would be good to add a regression test for this. Take a look at python/pyspark/tests.py for examples of how to do this (once you've created the JIRA, I'd reference the JIRA ticket number in your test via a line comment).

@shanghaiclown
Copy link
Contributor Author

JIRA ticket here: https://issues.apache.org/jira/browse/SPARK-9021

Will look into regression tests and other contribution requirements.

@shanghaiclown
Copy link
Contributor Author

I'll note that I did consider adding more copies on line 904 as you mention, as well as within RDD.fold(). I don't believe another copy is needed, as RDD.fold() appears to properly guard against mutation (see comment in RDD.fold()). It appeared other places within the RDD module were good as well, but will investigate more closely with the forthcoming regression tests.

@JoshRosen
Copy link
Contributor

Ah, cool. I suspected that you might have looked into the case on line 904, but just wanted to confirm. Wouldn't hurt to add a line comment there to explain this :)

@shanghaiclown shanghaiclown changed the title Having pyspark's RDD.aggregate() make a deepcopy of zeroValue for each partition [SPARK-9021][PySpark] Having RDD.aggregate() make a deepcopy of zeroValue for each partition Jul 14, 2015
…DD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c.
…DD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range.
@shanghaiclown
Copy link
Contributor Author

Okay, think that does it; let me know if the tests seem sufficient (or overkill). Should mention I took the liberty of replacing a few calls to range() with xrange() in some other tests per the documentation for parallelize(). Confirmed that RDD.aggregate() appears to be the only problematic handling of zero values, and that this PR fixes the issue.

@JoshRosen
Copy link
Contributor

Jenkins this is ok to test

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37336 has finished for PR 7378 at commit 2fa4e4b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shanghaiclown
Copy link
Contributor Author

Did not realize collections.Counter() wasn't introduced until Python 2.7, and that tests were run against Python 2.6...will change those tests to use a different data structure later today.

…patibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication.
@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37359 has finished for PR 7378 at commit 391de4a.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37364 has finished for PR 7378 at commit 56eb2ab.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37369 has finished for PR 7378 at commit 8d8d694.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shanghaiclown
Copy link
Contributor Author

Hooray. Sorry for the churn, guess I should have a Python 2.6 environment to do backwards compatibility tests :).

@JoshRosen
Copy link
Contributor

I'm actually going to kick off another build now since we just merged a change to the Python linter and I want to avoid a committing race that leads to a build break.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #1069 has finished for PR 7378 at commit 8d8d694.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Least(children: Expression*) extends Expression
    • case class Greatest(children: Expression*) extends Expression

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37373 has finished for PR 7378 at commit 8d8d694.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

@davies, this would be another good PR for you to review, esp. now that it's passing tests.

@davies
Copy link
Contributor

davies commented Jul 15, 2015

@njhwang @JoshRosen The reason we need to do deepcopy here is that the two mapPartition() will be combined together as a single task. When the closure is serialized by pickle, the zeroValue will be memorized by Pickler, and deserialized as single object for two mapParition(). It's weird to call fold() inside aggregate(), because it does not need to run combOp on each partition, it should be change to:

  return reduce(combOp, self.mapPartitions(func).collect(), zeroValue)

Then we don't need to do the deep copy (deep copy may fail in some cases, for example, object from C extension).

@shanghaiclown
Copy link
Contributor Author

@davies That makes sense. I just confirmed this works (and it's exactly what you do in RDD.fold() anyway). Will do another build/test sequence locally later today before pushing and having Mr. Jenkins put it through the ringer again.

…artitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results.
@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37424 has finished for PR 7378 at commit 659bb27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Jul 19, 2015

LGTM, could you update the title and description?

@shanghaiclown shanghaiclown changed the title [SPARK-9021][PySpark] Having RDD.aggregate() make a deepcopy of zeroValue for each partition [SPARK-9021][PySpark] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold() Jul 19, 2015
@shanghaiclown
Copy link
Contributor Author

Done!

@davies
Copy link
Contributor

davies commented Jul 19, 2015

Thanks, merging into master and branch 1.4!

@asfgit asfgit closed this in a803ac3 Jul 19, 2015
asfgit pushed a commit that referenced this pull request Jul 19, 2015
…ons()) instead of mapPartitions.fold()

I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used.

Currently, if one happens to use a mutable object as `zeroValue` for `RDD.aggregate()`, possibly unexpected behavior can occur.

This is because pyspark's current implementation of `RDD.aggregate()` does not serialize or make a copy of `zeroValue` before handing it off to `RDD.mapPartitions(...).fold(...)`. This results in a single reference to `zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each partition. This can result in strange accumulator values being fed into each partition's call to `RDD.fold()`, as the `zeroValue` may have been changed in-place during the `RDD.mapPartitions()` call.

As an illustrative example, submit the following to `spark-submit`:
```
from pyspark import SparkConf, SparkContext
import collections

def updateCounter(acc, val):
    print 'update acc:', acc
    print 'update val:', val
    acc[val] += 1
    return acc

def comboCounter(acc1, acc2):
    print 'combo acc1:', acc1
    print 'combo acc2:', acc2
    acc1.update(acc2)
    return acc1

def main():
    conf = SparkConf().setMaster("local").setAppName("Aggregate with Counter")
    sc = SparkContext(conf = conf)

    print '======= AGGREGATING with ONE PARTITION ======='
    print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), updateCounter, comboCounter)

    print '======= AGGREGATING with TWO PARTITIONS ======='
    print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), updateCounter, comboCounter)

if __name__ == "__main__":
    main()
```

One probably expects this to output the following:
```
Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})
```

But it instead outputs this (regardless of the number of partitions):
```
Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2})
```

This is because (I believe) `zeroValue` gets passed correctly to each partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object has been updated and is then passed to `RDD.fold()`, which results in all items being double-counted within each partition before being finally reduced at the calling node.

I realize that this type of calculation is typically done by `RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some potentially confusing behavior. I also noticed that other `RDD` methods use this `deepcopy` approach to creating unique copies of `zeroValue` (i.e., `RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala implementations do seem to serialize the `zeroValue` object appropriately to prevent this type of behavior.

Author: Nicholas Hwang <[email protected]>

Closes #7378 from njhwang/master and squashes the following commits:

659bb27 [Nicholas Hwang] Fixed RDD.aggregate() to perform a reduce operation on collected mapPartitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results.
8d8d694 [Nicholas Hwang] Changed dict construction to be compatible with Python 2.6 (cannot use list comprehensions to make dicts)
56eb2ab [Nicholas Hwang] Fixed whitespace after colon to conform with PEP8
391de4a [Nicholas Hwang] Removed used of collections.Counter from RDD tests for Python 2.6 compatibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication.
2fa4e4b [Nicholas Hwang] Merge branch 'master' of https://github.com/njhwang/spark
ba528bd [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range.
7820391 [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c.
90d1544 [Nicholas Hwang] Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior).

(cherry picked from commit a803ac3)
Signed-off-by: Davies Liu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants