From 9fd82dbbcb8b10debbe95f1acab53ae8b340f38e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 4 Aug 2014 15:54:52 -0700 Subject: [PATCH 1/2] [SPARK-1687] [PySpark] fix unit tests related to pickable namedtuple serializer is imported multiple times during doctests, so it's better to make _hijack_namedtuple() safe to be called multiple times. Author: Davies Liu Closes #1771 from davies/fix and squashes the following commits: 1a9e336 [Davies Liu] fix unit tests --- python/pyspark/serializers.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 1b52c144df087..a10f85b55ad30 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -297,8 +297,11 @@ def __reduce__(self): def _hijack_namedtuple(): """ Hack namedtuple() to make it picklable """ - global _old_namedtuple # or it will put in closure + # hijack only one time + if hasattr(collections.namedtuple, "__hijack"): + return + global _old_namedtuple # or it will put in closure def _copy_func(f): return types.FunctionType(f.func_code, f.func_globals, f.func_name, f.func_defaults, f.func_closure) @@ -313,6 +316,7 @@ def namedtuple(name, fields, verbose=False, rename=False): collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple collections.namedtuple.func_code = namedtuple.func_code + collections.namedtuple.__hijack = 1 # hack the cls already generated by namedtuple # those created in other module can be pickled as normal, From 05bf4e4aff0d052a53d3e64c43688f07e27fec50 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 4 Aug 2014 20:39:18 -0700 Subject: [PATCH 2/2] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext Author: Reynold Xin Closes #1772 from rxin/accumulator-dagscheduler and squashes the following commits: 6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++++-- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 11 +++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d87c3048985fc..9fa3a4e9c71ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -904,8 +904,13 @@ class DAGScheduler( event.reason match { case Success => if (event.accumUpdates != null) { - // TODO: fail the stage if the accumulator update fails... - Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted + try { + Accumulators.add(event.accumUpdates) + } catch { + // If we see an exception during accumulator update, just log the error and move on. + case e: Exception => + logError(s"Failed to update accumulators for $task", e) + } } stage.pendingTasks -= task task match { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 36e238b4c9434..8c1b0fed11f72 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } - // TODO: Fix this and un-ignore the test. - ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") { + test("misbehaved accumulator should not crash DAGScheduler and SparkContext") { val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 override def zero(initialValue: Int): Int = 0 @@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F }) // Run this on executors - intercept[SparkDriverExecutionException] { - sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } - } + sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } // Run this within a local thread - intercept[SparkDriverExecutionException] { - sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) - } + sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) // Make sure we can still run local commands as well as cluster commands. assert(sc.parallelize(1 to 10, 2).count() === 10)