diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala index 60de4a305232d..4f6f5e235811d 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala @@ -141,14 +141,14 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, onKeyInterestChangeCallback = callback } - def callOnExceptionCallback(e: Throwable) { + def callOnExceptionCallbacks(e: Throwable) { onExceptionCallbacks foreach { callback => try { callback(this, e) } catch { case NonFatal(e) => { - logWarning("Ignore error", e) + logWarning("Ignored error in onExceptionCallback", e) } } } @@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } catch { case e: Exception => { logError("Error connecting to " + address, e) - callOnExceptionCallback(e) + callOnExceptionCallbacks(e) } } } @@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) - callOnExceptionCallback(e) + callOnExceptionCallbacks(e) } } true @@ -400,7 +400,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } catch { case e: Exception => { logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e) - callOnExceptionCallback(e) + callOnExceptionCallbacks(e) close() return false } @@ -427,7 +427,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, case e: Exception => logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(), e) - callOnExceptionCallback(e) + callOnExceptionCallbacks(e) close() } @@ -584,7 +584,7 @@ private[spark] class ReceivingConnection( } catch { case e: Exception => { logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e) - callOnExceptionCallback(e) + callOnExceptionCallbacks(e) close() return false } diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 85763dfbf7783..6b00190c5eccc 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -64,7 +64,7 @@ private[nio] class ConnectionManager( } } - def receiveNonAck() { + def failWithoutAck() { completionHandler(scala.util.Failure(new IOException("Failed without being ACK'd"))) } @@ -87,7 +87,7 @@ private[nio] class ConnectionManager( override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) - if (t != null) { + if (t != null && NonFatal(t)) { logError("Error in handleMessageExecutor is not handled properly", t) } } @@ -103,7 +103,7 @@ private[nio] class ConnectionManager( override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) - if (t != null) { + if (t != null && NonFatal(t)) { logError("Error in handleReadWriteExecutor is not handled properly", t) } } @@ -198,7 +198,7 @@ private[nio] class ConnectionManager( } catch { case NonFatal(e) => { logError("Error when writing to " + conn.getRemoteConnectionManagerId(), e) - conn.callOnExceptionCallback(e) + conn.callOnExceptionCallbacks(e) } } } @@ -238,7 +238,7 @@ private[nio] class ConnectionManager( } catch { case NonFatal(e) => { logError("Error when reading from " + conn.getRemoteConnectionManagerId(), e) - conn.callOnExceptionCallback(e) + conn.callOnExceptionCallbacks(e) } } } @@ -272,7 +272,7 @@ private[nio] class ConnectionManager( } catch { case NonFatal(e) => { logError("Error when finishConnect for " + conn.getRemoteConnectionManagerId(), e) - conn.callOnExceptionCallback(e) + conn.callOnExceptionCallbacks(e) } } } @@ -295,7 +295,7 @@ private[nio] class ConnectionManager( handleConnectExecutor.execute(new Runnable { override def run() { try { - conn.callOnExceptionCallback(e) + conn.callOnExceptionCallbacks(e) } catch { // ignore exceptions case NonFatal(e) => logDebug("Ignoring exception", e) @@ -497,7 +497,7 @@ private[nio] class ConnectionManager( messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId) .foreach(status => { logInfo("Notifying " + status) - status.receiveNonAck() + status.failWithoutAck() }) messageStatuses.retain((i, status) => { @@ -526,7 +526,7 @@ private[nio] class ConnectionManager( for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { logInfo("Notifying " + s) - s.receiveNonAck() + s.failWithoutAck() } messageStatuses.retain((i, status) => { @@ -567,7 +567,7 @@ private[nio] class ConnectionManager( case NonFatal(e) => { logError("Error when handling messages from " + connection.getRemoteConnectionManagerId(), e) - connection.callOnExceptionCallback(e) + connection.callOnExceptionCallbacks(e) } } } @@ -848,7 +848,7 @@ private[nio] class ConnectionManager( try { checkSendAuthFirst(connectionManagerId, connection) } catch { - case e: Exception => { + case NonFatal(e) => { reportSendingMessageFailure(message.id, e) } }