From c01c4501446bde1c5c045e2ac57c1be10c7d1b18 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 4 Aug 2014 14:07:10 -0700 Subject: [PATCH] Return Try[Message] from sendMessageReliablySync. This forces callers to consider error handling. --- .../org/apache/spark/network/ConnectionManager.scala | 5 +++-- .../scala/org/apache/spark/network/SenderTest.scala | 3 +-- .../org/apache/spark/storage/BlockManagerWorker.scala | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index ddf6b6e5404c1..9440bcbe009ba 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -35,6 +35,7 @@ import scala.collection.mutable.SynchronizedQueue import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.Try import org.apache.spark._ import org.apache.spark.util.{SystemClock, Utils} @@ -849,8 +850,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, - message: Message): Message = { - Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf) + message: Message): Try[Message] = { + Try(Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)) } def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) { diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala index f15eaef80d3e1..b8ea7c2cff9a2 100644 --- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala +++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala @@ -18,7 +18,6 @@ package org.apache.spark.network import java.nio.ByteBuffer -import scala.util.Try import org.apache.spark.{SecurityManager, SparkConf} private[spark] object SenderTest { @@ -52,7 +51,7 @@ private[spark] object SenderTest { val dataMessage = Message.createBufferMessage(buffer.duplicate) val startTime = System.currentTimeMillis /* println("Started timer at " + startTime) */ - val responseStr = Try(manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)) + val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) .map { response => val buffer = response.asInstanceOf[BufferMessage].buffers(0) new String(buffer.array, "utf-8") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 518cc5e419102..35f99d7569fe8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -23,7 +23,7 @@ import org.apache.spark.Logging import org.apache.spark.network._ import org.apache.spark.util.Utils -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success} /** * A network interface for BlockManager. Each slave should have one @@ -117,8 +117,8 @@ private[spark] object BlockManagerWorker extends Logging { val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) - val resultMessage = Try(connectionManager.sendMessageReliablySync( - toConnManagerId, blockMessageArray.toBufferMessage)) + val resultMessage = connectionManager.sendMessageReliablySync( + toConnManagerId, blockMessageArray.toBufferMessage) resultMessage.isSuccess } @@ -127,8 +127,8 @@ private[spark] object BlockManagerWorker extends Logging { val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) - val responseMessage = Try(connectionManager.sendMessageReliablySync( - toConnManagerId, blockMessageArray.toBufferMessage)) + val responseMessage = connectionManager.sendMessageReliablySync( + toConnManagerId, blockMessageArray.toBufferMessage) responseMessage match { case Success(message) => { val bufferMessage = message.asInstanceOf[BufferMessage]