Skip to content

Commit

Permalink
Make functions run by the Akka scheduler use Executor's UncaughtExcep…
Browse files Browse the repository at this point in the history
…tionHandler
  • Loading branch information
markhamstra committed May 12, 2014
1 parent d9c97ba commit 8fc0439
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -88,6 +88,7 @@ private[spark] class AppClient(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils}

/**
* @param masterUrls Each url should look like spark://host:port.
Expand Down Expand Up @@ -166,6 +166,7 @@ private[spark] class Worker(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils}

/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.UncaughtExceptionHandler

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -139,6 +140,7 @@ private[spark] class TaskSchedulerImpl(
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
checkSpeculatableTasks()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private[spark] class BlockManager(
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
heartBeat()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.Logging

object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}

0 comments on commit 8fc0439

Please sign in to comment.