Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into configure-ports
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 5, 2014
2 parents 9868358 + 05bf4e4 commit 151327a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,13 @@ class DAGScheduler(
event.reason match {
case Success =>
if (event.accumUpdates != null) {
// TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
try {
Accumulators.add(event.accumUpdates)
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
stage.pendingTasks -= task
task match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}

// TODO: Fix this and un-ignore the test.
ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
override def zero(initialValue: Int): Int = 0
Expand All @@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
})

// Run this on executors
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
}
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }

// Run this within a local thread
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
}
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)

// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
Expand Down
6 changes: 5 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,11 @@ def __reduce__(self):

def _hijack_namedtuple():
""" Hack namedtuple() to make it picklable """
global _old_namedtuple # or it will put in closure
# hijack only one time
if hasattr(collections.namedtuple, "__hijack"):
return

global _old_namedtuple # or it will put in closure
def _copy_func(f):
return types.FunctionType(f.func_code, f.func_globals, f.func_name,
f.func_defaults, f.func_closure)
Expand All @@ -313,6 +316,7 @@ def namedtuple(name, fields, verbose=False, rename=False):
collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
collections.namedtuple.func_code = namedtuple.func_code
collections.namedtuple.__hijack = 1

# hack the cls already generated by namedtuple
# those created in other module can be pickled as normal,
Expand Down

0 comments on commit 151327a

Please sign in to comment.