From f34f3d71f6551da5e96b0de99c0f61fa981967f6 Mon Sep 17 00:00:00 2001 From: tien-dungle Date: Fri, 17 Jul 2015 12:11:32 -0700 Subject: [PATCH 1/4] [SPARK-9109] [GRAPHX] Keep the cached edge in the graph The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted. ```java import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.slf4j.LoggerFactory import org.apache.spark.graphx.util.GraphGenerators // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) graph.cache().numEdges graph.unpersist() sc.getPersistentRDDs.foreach( r => println( r._2.toString)) ``` Author: tien-dungle Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits: 8d87997 [tien-dungle] Keep the cached edge in the graph (cherry picked from commit 587c315b204f1439f696620543c38166d95f8a3d) Signed-off-by: Ankur Dave --- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 90a74d23a26cc..da95314440d86 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -332,9 +332,9 @@ object GraphImpl { edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD]) - .withTargetStorageLevel(edgeStorageLevel).cache() + .withTargetStorageLevel(edgeStorageLevel) val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) - .withTargetStorageLevel(vertexStorageLevel).cache() + .withTargetStorageLevel(vertexStorageLevel) GraphImpl(vertexRDD, edgeRDD) } @@ -346,9 +346,14 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { + + vertices.cache() + // Convert the vertex partitions in edges to the correct type val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD]) + .cache() + GraphImpl.fromExistingRDDs(vertices, newEdges) } From 59838280f96accc62f96d77579660e211ed32401 Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 17 Jul 2015 22:47:28 +0100 Subject: [PATCH 2/4] [SPARK-8593] [CORE] Sort app attempts by start time. This makes sure attempts are listed in the order they were executed, and that the app's state matches the state of the most current attempt. Author: Joshi Author: Rekha Joshi Closes #7253 from rekhajoshm/SPARK-8593 and squashes the following commits: 874dd80 [Joshi] History Server: updated order for multiple attempts(logcleaner) 716e0b1 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime) 548c753 [Joshi] History Server: updated order for multiple attempts(descending start time works everytime) 83306a8 [Joshi] History Server: updated order for multiple attempts(descending start time) b0fc922 [Joshi] History Server: updated order for multiple attempts(updated comment) cc0fda7 [Joshi] History Server: updated order for multiple attempts(updated test) 304cb0b [Joshi] History Server: updated order for multiple attempts(reverted HistoryPage) 85024e8 [Joshi] History Server: updated order for multiple attempts a41ac4b [Joshi] History Server: updated order for multiple attempts ab65fa1 [Joshi] History Server: some attempt completed to work with showIncomplete 0be142d [Rekha Joshi] Merge pull request #3 from apache/master 106fd8e [Rekha Joshi] Merge pull request #2 from apache/master e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master (cherry picked from commit 42d8a012f6652df1fa3f560f87c53731ea070640) Signed-off-by: Sean Owen --- .../deploy/history/FsHistoryProvider.scala | 10 +++----- .../history/FsHistoryProviderSuite.scala | 24 +++++++++---------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3daa653589adf..b1e43fc2c862b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -352,8 +352,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same - * application. Order is: running attempts before complete attempts, running attempts sorted - * by start time, completed attempts sorted by end time. + * application. Order is: attempts are sorted by descending start time. + * Most recent attempt state matches with current state of the app. * * Normally applications should have a single running attempt; but failure to call sc.stop() * may cause multiple running attempts to show up. @@ -363,11 +363,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def compareAttemptInfo( a1: FsApplicationAttemptInfo, a2: FsApplicationAttemptInfo): Boolean = { - if (a1.completed == a2.completed) { - if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime - } else { - !a1.completed - } + a1.startTime >= a2.startTime } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index afa4958172af2..360c7c3d98902 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -239,13 +239,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc appListAfterRename.size should be (1) } - test("apps with multiple attempts") { + test("apps with multiple attempts with order") { val provider = new FsHistoryProvider(createTestConf()) - val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false) + val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, - SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), - SparkListenerApplicationEnd(2L) + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")) ) updateAndCheck(provider) { list => @@ -255,7 +254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true) writeFile(attempt2, true, None, - SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")) + SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2")) ) updateAndCheck(provider) { list => @@ -264,22 +263,21 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc list.head.attempts.head.attemptId should be (Some("attempt2")) } - val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false) - attempt2.delete() - writeFile(attempt2, true, None, - SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")), + val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false) + writeFile(attempt3, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")), SparkListenerApplicationEnd(4L) ) updateAndCheck(provider) { list => list should not be (null) list.size should be (1) - list.head.attempts.size should be (2) - list.head.attempts.head.attemptId should be (Some("attempt2")) + list.head.attempts.size should be (3) + list.head.attempts.head.attemptId should be (Some("attempt3")) } val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false) - writeFile(attempt2, true, None, + writeFile(attempt1, true, None, SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")), SparkListenerApplicationEnd(6L) ) @@ -287,7 +285,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(provider) { list => list.size should be (2) list.head.attempts.size should be (1) - list.last.attempts.size should be (2) + list.last.attempts.size should be (3) list.head.attempts.head.attemptId should be (Some("attempt1")) list.foreach { case app => From 6834d96a6626ea70f10f9c02b2ce0e28e0607c92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20Anders=20D=C3=BCvel?= Date: Sun, 19 Jul 2015 09:14:55 +0100 Subject: [PATCH 3/4] [SPARK-9094] [PARENT] Increased io.dropwizard.metrics from 3.1.0 to 3.1.2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We are running Spark 1.4.0 in production and ran into problems because after a network hiccup (which happens often in our current environment) no more metrics were reported to graphite leaving us blindfolded about the current state of our spark applications. [This problem](https://github.com/dropwizard/metrics/commit/70559816f1fc3a0a0122b5263d5478ff07396991) was fixed in the current version of the metrics library. We run spark with this change in production now and have seen no problems. We also had a look at the commit history since 3.1.0 and did not detect any potentially incompatible changes but many fixes which could potentially help other users as well. Author: Carl Anders Düvel Closes #7493 from hackbert/bump-metrics-lib-version and squashes the following commits: 6677565 [Carl Anders Düvel] [SPARK-9094] [PARENT] Increased io.dropwizard.metrics from 3.1.0 to 3.1.2 in order to get this fix https://github.com/dropwizard/metrics/commit/70559816f1fc3a0a0122b5263d5478ff07396991 (cherry picked from commit 344d1567e5ac28b3ab8f83f18d2fa9d98acef152) Signed-off-by: Sean Owen --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 512bc87f2f907..53e938885c083 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ 0.5.0 2.4.0 2.0.8 - 3.1.0 + 3.1.2 1.7.7 hadoop2 0.7.1 From a3c853c3fed721fd0fbf29547abe8437021cd692 Mon Sep 17 00:00:00 2001 From: Nicholas Hwang Date: Sun, 19 Jul 2015 10:30:28 -0700 Subject: [PATCH 4/4] [SPARK-9021] [PYSPARK] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold() I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used. Currently, if one happens to use a mutable object as `zeroValue` for `RDD.aggregate()`, possibly unexpected behavior can occur. This is because pyspark's current implementation of `RDD.aggregate()` does not serialize or make a copy of `zeroValue` before handing it off to `RDD.mapPartitions(...).fold(...)`. This results in a single reference to `zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each partition. This can result in strange accumulator values being fed into each partition's call to `RDD.fold()`, as the `zeroValue` may have been changed in-place during the `RDD.mapPartitions()` call. As an illustrative example, submit the following to `spark-submit`: ``` from pyspark import SparkConf, SparkContext import collections def updateCounter(acc, val): print 'update acc:', acc print 'update val:', val acc[val] += 1 return acc def comboCounter(acc1, acc2): print 'combo acc1:', acc1 print 'combo acc2:', acc2 acc1.update(acc2) return acc1 def main(): conf = SparkConf().setMaster("local").setAppName("Aggregate with Counter") sc = SparkContext(conf = conf) print '======= AGGREGATING with ONE PARTITION =======' print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), updateCounter, comboCounter) print '======= AGGREGATING with TWO PARTITIONS =======' print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), updateCounter, comboCounter) if __name__ == "__main__": main() ``` One probably expects this to output the following: ``` Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1}) ``` But it instead outputs this (regardless of the number of partitions): ``` Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2}) ``` This is because (I believe) `zeroValue` gets passed correctly to each partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object has been updated and is then passed to `RDD.fold()`, which results in all items being double-counted within each partition before being finally reduced at the calling node. I realize that this type of calculation is typically done by `RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some potentially confusing behavior. I also noticed that other `RDD` methods use this `deepcopy` approach to creating unique copies of `zeroValue` (i.e., `RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala implementations do seem to serialize the `zeroValue` object appropriately to prevent this type of behavior. Author: Nicholas Hwang Closes #7378 from njhwang/master and squashes the following commits: 659bb27 [Nicholas Hwang] Fixed RDD.aggregate() to perform a reduce operation on collected mapPartitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results. 8d8d694 [Nicholas Hwang] Changed dict construction to be compatible with Python 2.6 (cannot use list comprehensions to make dicts) 56eb2ab [Nicholas Hwang] Fixed whitespace after colon to conform with PEP8 391de4a [Nicholas Hwang] Removed used of collections.Counter from RDD tests for Python 2.6 compatibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication. 2fa4e4b [Nicholas Hwang] Merge branch 'master' of https://github.com/njhwang/spark ba528bd [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c3e17906098f801cbc2059e7a9054e8cab. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range. 7820391 [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c3e17906098f801cbc2059e7a9054e8cab. 90d1544 [Nicholas Hwang] Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior). (cherry picked from commit a803ac3e060d181c7b34d9501c9350e5f215ba85) Signed-off-by: Davies Liu --- python/pyspark/rdd.py | 10 ++- python/pyspark/tests.py | 141 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 137 insertions(+), 14 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index cb20bc8b54027..0346fc45e48ba 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -849,6 +849,9 @@ def func(iterator): for obj in iterator: acc = op(obj, acc) yield acc + # collecting result of mapPartitions here ensures that the copy of + # zeroValue provided to each partition is unique from the one provided + # to the final reduce call vals = self.mapPartitions(func).collect() return reduce(op, vals, zeroValue) @@ -878,8 +881,11 @@ def func(iterator): for obj in iterator: acc = seqOp(acc, obj) yield acc - - return self.mapPartitions(func).fold(zeroValue, combOp) + # collecting result of mapPartitions here ensures that the copy of + # zeroValue provided to each partition is unique from the one provided + # to the final reduce call + vals = self.mapPartitions(func).collect() + return reduce(combOp, vals, zeroValue) def treeAggregate(self, zeroValue, seqOp, combOp, depth=2): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 78265423682b0..d26866faca447 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -529,10 +529,127 @@ def test_deleting_input_files(self): def test_sampling_default_seed(self): # Test for SPARK-3995 (default seed setting) - data = self.sc.parallelize(range(1000), 1) + data = self.sc.parallelize(xrange(1000), 1) subset = data.takeSample(False, 10) self.assertEqual(len(subset), 10) + def test_aggregate_mutable_zero_value(self): + # Test for SPARK-9021; uses aggregate and treeAggregate to build dict + # representing a counter of ints + # NOTE: dict is used instead of collections.Counter for Python 2.6 + # compatibility + from collections import defaultdict + + # Show that single or multiple partitions work + data1 = self.sc.range(10, numSlices=1) + data2 = self.sc.range(10, numSlices=2) + + def seqOp(x, y): + x[y] += 1 + return x + + def comboOp(x, y): + for key, val in y.items(): + x[key] += val + return x + + counts1 = data1.aggregate(defaultdict(int), seqOp, comboOp) + counts2 = data2.aggregate(defaultdict(int), seqOp, comboOp) + counts3 = data1.treeAggregate(defaultdict(int), seqOp, comboOp, 2) + counts4 = data2.treeAggregate(defaultdict(int), seqOp, comboOp, 2) + + ground_truth = defaultdict(int, dict((i, 1) for i in range(10))) + self.assertEqual(counts1, ground_truth) + self.assertEqual(counts2, ground_truth) + self.assertEqual(counts3, ground_truth) + self.assertEqual(counts4, ground_truth) + + def test_aggregate_by_key_mutable_zero_value(self): + # Test for SPARK-9021; uses aggregateByKey to make a pair RDD that + # contains lists of all values for each key in the original RDD + + # list(range(...)) for Python 3.x compatibility (can't use * operator + # on a range object) + # list(zip(...)) for Python 3.x compatibility (want to parallelize a + # collection, not a zip object) + tuples = list(zip(list(range(10))*2, [1]*20)) + # Show that single or multiple partitions work + data1 = self.sc.parallelize(tuples, 1) + data2 = self.sc.parallelize(tuples, 2) + + def seqOp(x, y): + x.append(y) + return x + + def comboOp(x, y): + x.extend(y) + return x + + values1 = data1.aggregateByKey([], seqOp, comboOp).collect() + values2 = data2.aggregateByKey([], seqOp, comboOp).collect() + # Sort lists to ensure clean comparison with ground_truth + values1.sort() + values2.sort() + + ground_truth = [(i, [1]*2) for i in range(10)] + self.assertEqual(values1, ground_truth) + self.assertEqual(values2, ground_truth) + + def test_fold_mutable_zero_value(self): + # Test for SPARK-9021; uses fold to merge an RDD of dict counters into + # a single dict + # NOTE: dict is used instead of collections.Counter for Python 2.6 + # compatibility + from collections import defaultdict + + counts1 = defaultdict(int, dict((i, 1) for i in range(10))) + counts2 = defaultdict(int, dict((i, 1) for i in range(3, 8))) + counts3 = defaultdict(int, dict((i, 1) for i in range(4, 7))) + counts4 = defaultdict(int, dict((i, 1) for i in range(5, 6))) + all_counts = [counts1, counts2, counts3, counts4] + # Show that single or multiple partitions work + data1 = self.sc.parallelize(all_counts, 1) + data2 = self.sc.parallelize(all_counts, 2) + + def comboOp(x, y): + for key, val in y.items(): + x[key] += val + return x + + fold1 = data1.fold(defaultdict(int), comboOp) + fold2 = data2.fold(defaultdict(int), comboOp) + + ground_truth = defaultdict(int) + for counts in all_counts: + for key, val in counts.items(): + ground_truth[key] += val + self.assertEqual(fold1, ground_truth) + self.assertEqual(fold2, ground_truth) + + def test_fold_by_key_mutable_zero_value(self): + # Test for SPARK-9021; uses foldByKey to make a pair RDD that contains + # lists of all values for each key in the original RDD + + tuples = [(i, range(i)) for i in range(10)]*2 + # Show that single or multiple partitions work + data1 = self.sc.parallelize(tuples, 1) + data2 = self.sc.parallelize(tuples, 2) + + def comboOp(x, y): + x.extend(y) + return x + + values1 = data1.foldByKey([], comboOp).collect() + values2 = data2.foldByKey([], comboOp).collect() + # Sort lists to ensure clean comparison with ground_truth + values1.sort() + values2.sort() + + # list(range(...)) for Python 3.x compatibility + ground_truth = [(i, list(range(i))*2) for i in range(10)] + self.assertEqual(values1, ground_truth) + self.assertEqual(values2, ground_truth) + def test_aggregate_by_key(self): data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2) @@ -624,8 +741,8 @@ def test_zip_with_different_serializers(self): def test_zip_with_different_object_sizes(self): # regress test for SPARK-5973 - a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i) - b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i) + a = self.sc.parallelize(xrange(10000)).map(lambda i: '*' * i) + b = self.sc.parallelize(xrange(10000, 20000)).map(lambda i: '*' * i) self.assertEqual(10000, a.zip(b).count()) def test_zip_with_different_number_of_items(self): @@ -647,7 +764,7 @@ def test_zip_with_different_number_of_items(self): self.assertRaises(Exception, lambda: a.zip(b).count()) def test_count_approx_distinct(self): - rdd = self.sc.parallelize(range(1000)) + rdd = self.sc.parallelize(xrange(1000)) self.assertTrue(950 < rdd.countApproxDistinct(0.03) < 1050) self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.03) < 1050) self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.03) < 1050) @@ -777,7 +894,7 @@ def test_distinct(self): def test_external_group_by_key(self): self.sc._conf.set("spark.python.worker.memory", "1m") N = 200001 - kv = self.sc.parallelize(range(N)).map(lambda x: (x % 3, x)) + kv = self.sc.parallelize(xrange(N)).map(lambda x: (x % 3, x)) gkv = kv.groupByKey().cache() self.assertEqual(3, gkv.count()) filtered = gkv.filter(lambda kv: kv[0] == 1) @@ -871,7 +988,7 @@ def test_narrow_dependency_in_join(self): # Regression test for SPARK-6294 def test_take_on_jrdd(self): - rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x)) + rdd = self.sc.parallelize(xrange(1 << 20)).map(lambda x: str(x)) rdd._jrdd.first() def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): @@ -1503,13 +1620,13 @@ def run(): self.fail("daemon had been killed") # run a normal job - rdd = self.sc.parallelize(range(100), 1) + rdd = self.sc.parallelize(xrange(100), 1) self.assertEqual(100, rdd.map(str).count()) def test_after_exception(self): def raise_exception(_): raise Exception() - rdd = self.sc.parallelize(range(100), 1) + rdd = self.sc.parallelize(xrange(100), 1) with QuietTest(self.sc): self.assertRaises(Exception, lambda: rdd.foreach(raise_exception)) self.assertEqual(100, rdd.map(str).count()) @@ -1525,22 +1642,22 @@ def test_after_jvm_exception(self): with QuietTest(self.sc): self.assertRaises(Exception, lambda: filtered_data.count()) - rdd = self.sc.parallelize(range(100), 1) + rdd = self.sc.parallelize(xrange(100), 1) self.assertEqual(100, rdd.map(str).count()) def test_accumulator_when_reuse_worker(self): from pyspark.accumulators import INT_ACCUMULATOR_PARAM acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM) - self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x)) + self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc1.add(x)) self.assertEqual(sum(range(100)), acc1.value) acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM) - self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x)) + self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc2.add(x)) self.assertEqual(sum(range(100)), acc2.value) self.assertEqual(sum(range(100)), acc1.value) def test_reuse_worker_after_take(self): - rdd = self.sc.parallelize(range(100000), 1) + rdd = self.sc.parallelize(xrange(100000), 1) self.assertEqual(0, rdd.first()) def count():