diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 3daa653589adf..b1e43fc2c862b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -352,8 +352,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Comparison function that defines the sort order for application attempts within the same
- * application. Order is: running attempts before complete attempts, running attempts sorted
- * by start time, completed attempts sorted by end time.
+ * application. Order is: attempts are sorted by descending start time.
+ * Most recent attempt state matches with current state of the app.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
@@ -363,11 +363,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def compareAttemptInfo(
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
- if (a1.completed == a2.completed) {
- if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
- } else {
- !a1.completed
- }
+ a1.startTime >= a2.startTime
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index afa4958172af2..360c7c3d98902 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -239,13 +239,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
appListAfterRename.size should be (1)
}
- test("apps with multiple attempts") {
+ test("apps with multiple attempts with order") {
val provider = new FsHistoryProvider(createTestConf())
- val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
- SparkListenerApplicationEnd(2L)
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
)
updateAndCheck(provider) { list =>
@@ -255,7 +254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+ SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
)
updateAndCheck(provider) { list =>
@@ -264,22 +263,21 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
list.head.attempts.head.attemptId should be (Some("attempt2"))
}
- val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
- attempt2.delete()
- writeFile(attempt2, true, None,
- SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+ val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
+ writeFile(attempt3, true, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
SparkListenerApplicationEnd(4L)
)
updateAndCheck(provider) { list =>
list should not be (null)
list.size should be (1)
- list.head.attempts.size should be (2)
- list.head.attempts.head.attemptId should be (Some("attempt2"))
+ list.head.attempts.size should be (3)
+ list.head.attempts.head.attemptId should be (Some("attempt3"))
}
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
- writeFile(attempt2, true, None,
+ writeFile(attempt1, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)
@@ -287,7 +285,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
updateAndCheck(provider) { list =>
list.size should be (2)
list.head.attempts.size should be (1)
- list.last.attempts.size should be (2)
+ list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))
list.foreach { case app =>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 90a74d23a26cc..da95314440d86 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -332,9 +332,9 @@ object GraphImpl {
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
- .withTargetStorageLevel(edgeStorageLevel).cache()
+ .withTargetStorageLevel(edgeStorageLevel)
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
- .withTargetStorageLevel(vertexStorageLevel).cache()
+ .withTargetStorageLevel(vertexStorageLevel)
GraphImpl(vertexRDD, edgeRDD)
}
@@ -346,9 +346,14 @@ object GraphImpl {
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+
+ vertices.cache()
+
// Convert the vertex partitions in edges to the correct type
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
+ .cache()
+
GraphImpl.fromExistingRDDs(vertices, newEdges)
}
diff --git a/pom.xml b/pom.xml
index 29c0e5640b7a0..dac20261ff216 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,7 +146,7 @@
0.5.0
2.4.0
2.0.8
- 3.1.0
+ 3.1.2
1.7.7
hadoop2
0.7.1
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index cb20bc8b54027..0346fc45e48ba 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -849,6 +849,9 @@ def func(iterator):
for obj in iterator:
acc = op(obj, acc)
yield acc
+ # collecting result of mapPartitions here ensures that the copy of
+ # zeroValue provided to each partition is unique from the one provided
+ # to the final reduce call
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
@@ -878,8 +881,11 @@ def func(iterator):
for obj in iterator:
acc = seqOp(acc, obj)
yield acc
-
- return self.mapPartitions(func).fold(zeroValue, combOp)
+ # collecting result of mapPartitions here ensures that the copy of
+ # zeroValue provided to each partition is unique from the one provided
+ # to the final reduce call
+ vals = self.mapPartitions(func).collect()
+ return reduce(combOp, vals, zeroValue)
def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
"""
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 78265423682b0..d26866faca447 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -529,10 +529,127 @@ def test_deleting_input_files(self):
def test_sampling_default_seed(self):
# Test for SPARK-3995 (default seed setting)
- data = self.sc.parallelize(range(1000), 1)
+ data = self.sc.parallelize(xrange(1000), 1)
subset = data.takeSample(False, 10)
self.assertEqual(len(subset), 10)
+ def test_aggregate_mutable_zero_value(self):
+ # Test for SPARK-9021; uses aggregate and treeAggregate to build dict
+ # representing a counter of ints
+ # NOTE: dict is used instead of collections.Counter for Python 2.6
+ # compatibility
+ from collections import defaultdict
+
+ # Show that single or multiple partitions work
+ data1 = self.sc.range(10, numSlices=1)
+ data2 = self.sc.range(10, numSlices=2)
+
+ def seqOp(x, y):
+ x[y] += 1
+ return x
+
+ def comboOp(x, y):
+ for key, val in y.items():
+ x[key] += val
+ return x
+
+ counts1 = data1.aggregate(defaultdict(int), seqOp, comboOp)
+ counts2 = data2.aggregate(defaultdict(int), seqOp, comboOp)
+ counts3 = data1.treeAggregate(defaultdict(int), seqOp, comboOp, 2)
+ counts4 = data2.treeAggregate(defaultdict(int), seqOp, comboOp, 2)
+
+ ground_truth = defaultdict(int, dict((i, 1) for i in range(10)))
+ self.assertEqual(counts1, ground_truth)
+ self.assertEqual(counts2, ground_truth)
+ self.assertEqual(counts3, ground_truth)
+ self.assertEqual(counts4, ground_truth)
+
+ def test_aggregate_by_key_mutable_zero_value(self):
+ # Test for SPARK-9021; uses aggregateByKey to make a pair RDD that
+ # contains lists of all values for each key in the original RDD
+
+ # list(range(...)) for Python 3.x compatibility (can't use * operator
+ # on a range object)
+ # list(zip(...)) for Python 3.x compatibility (want to parallelize a
+ # collection, not a zip object)
+ tuples = list(zip(list(range(10))*2, [1]*20))
+ # Show that single or multiple partitions work
+ data1 = self.sc.parallelize(tuples, 1)
+ data2 = self.sc.parallelize(tuples, 2)
+
+ def seqOp(x, y):
+ x.append(y)
+ return x
+
+ def comboOp(x, y):
+ x.extend(y)
+ return x
+
+ values1 = data1.aggregateByKey([], seqOp, comboOp).collect()
+ values2 = data2.aggregateByKey([], seqOp, comboOp).collect()
+ # Sort lists to ensure clean comparison with ground_truth
+ values1.sort()
+ values2.sort()
+
+ ground_truth = [(i, [1]*2) for i in range(10)]
+ self.assertEqual(values1, ground_truth)
+ self.assertEqual(values2, ground_truth)
+
+ def test_fold_mutable_zero_value(self):
+ # Test for SPARK-9021; uses fold to merge an RDD of dict counters into
+ # a single dict
+ # NOTE: dict is used instead of collections.Counter for Python 2.6
+ # compatibility
+ from collections import defaultdict
+
+ counts1 = defaultdict(int, dict((i, 1) for i in range(10)))
+ counts2 = defaultdict(int, dict((i, 1) for i in range(3, 8)))
+ counts3 = defaultdict(int, dict((i, 1) for i in range(4, 7)))
+ counts4 = defaultdict(int, dict((i, 1) for i in range(5, 6)))
+ all_counts = [counts1, counts2, counts3, counts4]
+ # Show that single or multiple partitions work
+ data1 = self.sc.parallelize(all_counts, 1)
+ data2 = self.sc.parallelize(all_counts, 2)
+
+ def comboOp(x, y):
+ for key, val in y.items():
+ x[key] += val
+ return x
+
+ fold1 = data1.fold(defaultdict(int), comboOp)
+ fold2 = data2.fold(defaultdict(int), comboOp)
+
+ ground_truth = defaultdict(int)
+ for counts in all_counts:
+ for key, val in counts.items():
+ ground_truth[key] += val
+ self.assertEqual(fold1, ground_truth)
+ self.assertEqual(fold2, ground_truth)
+
+ def test_fold_by_key_mutable_zero_value(self):
+ # Test for SPARK-9021; uses foldByKey to make a pair RDD that contains
+ # lists of all values for each key in the original RDD
+
+ tuples = [(i, range(i)) for i in range(10)]*2
+ # Show that single or multiple partitions work
+ data1 = self.sc.parallelize(tuples, 1)
+ data2 = self.sc.parallelize(tuples, 2)
+
+ def comboOp(x, y):
+ x.extend(y)
+ return x
+
+ values1 = data1.foldByKey([], comboOp).collect()
+ values2 = data2.foldByKey([], comboOp).collect()
+ # Sort lists to ensure clean comparison with ground_truth
+ values1.sort()
+ values2.sort()
+
+ # list(range(...)) for Python 3.x compatibility
+ ground_truth = [(i, list(range(i))*2) for i in range(10)]
+ self.assertEqual(values1, ground_truth)
+ self.assertEqual(values2, ground_truth)
+
def test_aggregate_by_key(self):
data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2)
@@ -624,8 +741,8 @@ def test_zip_with_different_serializers(self):
def test_zip_with_different_object_sizes(self):
# regress test for SPARK-5973
- a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i)
- b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i)
+ a = self.sc.parallelize(xrange(10000)).map(lambda i: '*' * i)
+ b = self.sc.parallelize(xrange(10000, 20000)).map(lambda i: '*' * i)
self.assertEqual(10000, a.zip(b).count())
def test_zip_with_different_number_of_items(self):
@@ -647,7 +764,7 @@ def test_zip_with_different_number_of_items(self):
self.assertRaises(Exception, lambda: a.zip(b).count())
def test_count_approx_distinct(self):
- rdd = self.sc.parallelize(range(1000))
+ rdd = self.sc.parallelize(xrange(1000))
self.assertTrue(950 < rdd.countApproxDistinct(0.03) < 1050)
self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.03) < 1050)
self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.03) < 1050)
@@ -777,7 +894,7 @@ def test_distinct(self):
def test_external_group_by_key(self):
self.sc._conf.set("spark.python.worker.memory", "1m")
N = 200001
- kv = self.sc.parallelize(range(N)).map(lambda x: (x % 3, x))
+ kv = self.sc.parallelize(xrange(N)).map(lambda x: (x % 3, x))
gkv = kv.groupByKey().cache()
self.assertEqual(3, gkv.count())
filtered = gkv.filter(lambda kv: kv[0] == 1)
@@ -871,7 +988,7 @@ def test_narrow_dependency_in_join(self):
# Regression test for SPARK-6294
def test_take_on_jrdd(self):
- rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+ rdd = self.sc.parallelize(xrange(1 << 20)).map(lambda x: str(x))
rdd._jrdd.first()
def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
@@ -1503,13 +1620,13 @@ def run():
self.fail("daemon had been killed")
# run a normal job
- rdd = self.sc.parallelize(range(100), 1)
+ rdd = self.sc.parallelize(xrange(100), 1)
self.assertEqual(100, rdd.map(str).count())
def test_after_exception(self):
def raise_exception(_):
raise Exception()
- rdd = self.sc.parallelize(range(100), 1)
+ rdd = self.sc.parallelize(xrange(100), 1)
with QuietTest(self.sc):
self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
self.assertEqual(100, rdd.map(str).count())
@@ -1525,22 +1642,22 @@ def test_after_jvm_exception(self):
with QuietTest(self.sc):
self.assertRaises(Exception, lambda: filtered_data.count())
- rdd = self.sc.parallelize(range(100), 1)
+ rdd = self.sc.parallelize(xrange(100), 1)
self.assertEqual(100, rdd.map(str).count())
def test_accumulator_when_reuse_worker(self):
from pyspark.accumulators import INT_ACCUMULATOR_PARAM
acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
- self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x))
+ self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc1.add(x))
self.assertEqual(sum(range(100)), acc1.value)
acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
- self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x))
+ self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc2.add(x))
self.assertEqual(sum(range(100)), acc2.value)
self.assertEqual(sum(range(100)), acc1.value)
def test_reuse_worker_after_take(self):
- rdd = self.sc.parallelize(range(100000), 1)
+ rdd = self.sc.parallelize(xrange(100000), 1)
self.assertEqual(0, rdd.first())
def count():