Skip to content

Commit

Permalink
Return Try[Message] from sendMessageReliablySync.
Browse files Browse the repository at this point in the history
This forces callers to consider error handling.
  • Loading branch information
JoshRosen committed Aug 4, 2014
1 parent f1cd1bb commit c01c450
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import scala.collection.mutable.SynchronizedQueue
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try

import org.apache.spark._
import org.apache.spark.util.{SystemClock, Utils}
Expand Down Expand Up @@ -849,8 +850,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}

def sendMessageReliablySync(connectionManagerId: ConnectionManagerId,
message: Message): Message = {
Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)
message: Message): Try[Message] = {
Try(Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf))
}

def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.network

import java.nio.ByteBuffer
import scala.util.Try
import org.apache.spark.{SecurityManager, SparkConf}

private[spark] object SenderTest {
Expand Down Expand Up @@ -52,7 +51,7 @@ private[spark] object SenderTest {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/* println("Started timer at " + startTime) */
val responseStr = Try(manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage))
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
new String(buffer.array, "utf-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.network._
import org.apache.spark.util.Utils

import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}

/**
* A network interface for BlockManager. Each slave should have one
Expand Down Expand Up @@ -117,8 +117,8 @@ private[spark] object BlockManagerWorker extends Logging {
val connectionManager = blockManager.connectionManager
val blockMessage = BlockMessage.fromPutBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = Try(connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage))
val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
resultMessage.isSuccess
}

Expand All @@ -127,8 +127,8 @@ private[spark] object BlockManagerWorker extends Logging {
val connectionManager = blockManager.connectionManager
val blockMessage = BlockMessage.fromGetBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage)
val responseMessage = Try(connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage))
val responseMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
responseMessage match {
case Success(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
Expand Down

0 comments on commit c01c450

Please sign in to comment.