Skip to content

Commit

Permalink
[SPARK-2677] BasicBlockFetchIterator#next can wait forever
Browse files Browse the repository at this point in the history
Author: Kousuke Saruta <[email protected]>

Closes apache#1632 from sarutak/SPARK-2677 and squashes the following commits:

cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message
d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout
e85f88b [Kousuke Saruta] Removed useless synchronized blocks
7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide
9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala
7cbb8ca [Kousuke Saruta] Modified to match with scalastyle
8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack
a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala
  • Loading branch information
sarutak authored and conviva-zz committed Sep 4, 2014
1 parent bdd2904 commit 0dcec89
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.{Timer, TimerTask}
import java.util.concurrent.atomic.AtomicInteger

import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
Expand Down Expand Up @@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
var ackMessage: Option[Message] = None

def markDone(ackMessage: Option[Message]) {
this.synchronized {
this.ackMessage = ackMessage
completionHandler(this)
}
this.ackMessage = ackMessage
completionHandler(this)
}
}

private val selector = SelectorProvider.provider.openSelector()
private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)

// default to 30 second timeout waiting for authentication
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)

private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
Expand Down Expand Up @@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
}
}
if (bufferMessage.hasAckId()) {
val sentMessageStatus = messageStatuses.synchronized {
messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
messageStatuses -= bufferMessage.ackId
status
status.markDone(Some(message))
}
case None => {
throw new Exception("Could not find reference for received ack message " +
message.id)
/**
* We can fall down on this code because of following 2 cases
*
* (1) Invalid ack sent due to buggy code.
*
* (2) Late-arriving ack for a SendMessageStatus
* To avoid unwilling late-arriving ack
* caused by long pause like GC, you can set
* larger value than default to spark.core.connection.ack.wait.timeout
*/
logWarning(s"Could not find reference for received ack Message ${message.id}")
}
}
}
sentMessageStatus.markDone(Some(message))
} else {
var ackMessage : Option[Message] = None
try {
Expand Down Expand Up @@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Message] = {
val promise = Promise[Message]()

val timeoutTask = new TimerTask {
override def run(): Unit = {
messageStatuses.synchronized {
messageStatuses.remove(message.id).foreach ( s => {
promise.failure(
new IOException(s"sendMessageReliably failed because ack " +
"was not received within ${ackTimeout} sec"))
})
}
}
}

val status = new MessageStatus(message, connectionManagerId, s => {
timeoutTask.cancel()
s.ackMessage match {
case None => // Indicates a failure where we either never sent or never got ACK'd
case None => // Indicates a failure where we either never sent or never got ACK'd
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
case Some(ackMessage) =>
if (ackMessage.hasError) {
Expand All @@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}

ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
sendMessage(connectionManagerId, message)
promise.future
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ package org.apache.spark.network

import java.io.IOException
import java.nio._
import java.util.concurrent.TimeoutException

import org.apache.spark.{SecurityManager, SparkConf}
import org.scalatest.FunSuite

import org.mockito.Mockito._
import org.mockito.Matchers._

import scala.concurrent.TimeoutException
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try
import scala.util.{Failure, Success, Try}

/**
* Test the ConnectionManager with various security settings.
Expand Down Expand Up @@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {

}

test("sendMessageReliably timeout") {
val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
val ackTimeout = 30
clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")

val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)

val serverConf = new SparkConf
serverConf.set("spark.authenticate", "false")
val serverSecurityManager = new SecurityManager(serverConf)
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
Thread.sleep(ackTimeout * 3 * 1000)
None
})

val size = 10 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
val bufferMessage = Message.createBufferMessage(buffer.duplicate)

val future = manager.sendMessageReliably(managerServer.id, bufferMessage)

// Future should throw IOException in 30 sec.
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
Await.result(future, (ackTimeout * 2) second)
}

manager.stop()
managerServer.stop()
}

}

9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,15 @@ Apart from these, the following properties are also available, and may be useful
out and giving up.
</td>
</tr>
<tr>
<td><code>spark.core.connection.ack.wait.timeout</code></td>
<td>60</td>
<td>
Number of seconds for the connection to wait for ack to occur before timing
out and giving up. To avoid unwilling timeout caused by long pause like GC,
you can set larger value.
</td>
</tr>
<tr>
<td><code>spark.ui.filters</code></td>
<td>None</td>
Expand Down

0 comments on commit 0dcec89

Please sign in to comment.