Skip to content

Commit

Permalink
SPARK-1772 Stop catching Throwable, let Executors die
Browse files Browse the repository at this point in the history
The main issue this patch fixes is [SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772), in which Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch causes Executors to delegate to the ExecutorUncaughtExceptionHandler when a fatal exception is thrown.

This patch also continues the fight in the neverending war against `case t: Throwable =>`, by only catching Exceptions in many places, and adding a wrapper for Threads and Runnables to make sure any uncaught exceptions are at least printed to the logs.

It also turns out that it is unlikely that the IndestructibleActorSystem actually works, given testing ([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The uncaughtExceptionHandler is not called from the places that we expected it would be.
[SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part of this issue, but refactoring our Actor Systems to ensure that exceptions are dealt with properly is a much bigger change, outside the scope of this PR.

Author: Aaron Davidson <[email protected]>

Closes apache#715 from aarondav/throwable and squashes the following commits:

f9b9bfe [Aaron Davidson] Remove other redundant 'throw e'
e937a0a [Aaron Davidson] Address Prashant and Matei's comments
1867867 [Aaron Davidson] [RFC] SPARK-1772 Stop catching Throwable, let Executors die
(cherry picked from commit 3af1f38)

Signed-off-by: Patrick Wendell <[email protected]>

Conflicts:
	core/src/main/scala/org/apache/spark/ContextCleaner.scala
	core/src/main/scala/org/apache/spark/SparkContext.scala
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
	core/src/main/scala/org/apache/spark/deploy/Client.scala
	core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
	core/src/main/scala/org/apache/spark/deploy/master/Master.scala
	core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
	core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
	core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
	core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
	core/src/main/scala/org/apache/spark/util/Utils.scala
  • Loading branch information
aarondav authored and markhamstra committed May 14, 2014
1 parent 49621ec commit be93773
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 36 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ object SparkContext {
/** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)

<<<<<<< HEAD
/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
private[spark] val executorMemoryRequested = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
case e => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
new Socket(daemonHost, daemonPort)
} catch {
case exc: SocketException => {
case exc: SocketException =>
logWarning("Python daemon unexpectedly quit, attempting to restart")
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
}
case e => throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object SparkHadoopUtil {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
Expand Down
38 changes: 11 additions & 27 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,7 @@ private[spark] class Executor(
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
)
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
}

val executorSource = new ExecutorSource(this, executorId)
Expand Down Expand Up @@ -258,6 +237,11 @@ private[spark] class Executor(
}

case t: Throwable => {
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
// the default uncaught exception handler, which will terminate the Executor.
logError("Exception in task ID " + taskId, t)

val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
Expand All @@ -267,11 +251,11 @@ private[spark] class Executor(
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) {
ExecutorUncaughtExceptionHandler.uncaughtException(t)
}
}
} finally {
runningTasks.remove(taskId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.Logging
import org.apache.spark.util.Utils

/**
* The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong.
*/
private[spark] object ExecutorUncaughtExceptionHandler
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}

def uncaughtException(exception: Throwable) {
uncaughtException(Thread.currentThread(), exception)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
localDirs.foreach { localDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case t: Throwable =>
logError("Exception while deleting local spark dir: " + localDir, t)
case e: Exception =>
logError("Exception while deleting local spark dir: " + localDir, e)
}
}

Expand Down
27 changes: 26 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
Expand All @@ -37,7 +38,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
import org.apache.spark.{SparkException, Logging}


/**
* Various utility methods used by Spark.
*/
Expand Down Expand Up @@ -823,4 +823,29 @@ private[spark] object Utils extends Logging {
return System.getProperties().clone()
.asInstanceOf[java.util.Properties].toMap[String, String]
}

/**
* Executes the given block, printing and re-throwing any uncaught exceptions.
* This is particularly useful for wrapping code that runs in a thread, to ensure
* that exceptions are printed, and to avoid having to catch Throwable.
*/
def logUncaughtExceptions[T](f: => T): T = {
try {
f
} catch {
case t: Throwable =>
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
throw t
}
}

/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
def isFatalError(e: Throwable): Boolean = {
e match {
case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
false
case _ =>
true
}
}
}

0 comments on commit be93773

Please sign in to comment.