Skip to content

Commit

Permalink
Address rxin's review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 15, 2014
1 parent 2a2e92d commit afcc8d6
Showing 1 changed file with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -914,23 +914,23 @@ private[nio] class ConnectionManager(
val timeoutTask: TimerTask = new TimerTask {
override def run(timeout: Timeout): Unit = {
messageStatuses.synchronized {
messageStatuses.remove(messageId).foreach ( s => {
messageStatuses.remove(messageId).foreach { s =>
val e = new IOException("sendMessageReliably failed because ack " +
s"was not received within $ackTimeout sec")
Option(promiseReference.get) match {
case Some(p) =>
// Attempt to fail the promise with a Timeout exception
if (!p.tryFailure(e)) {
// If we reach here, then someone else has already signalled success or failure
// on this promise, so log a warning:
logError("Ignore error because promise is completed", e)
}
case None =>
// The WeakReference was empty, which should never happen because
// sendMessageReliably's caller should have a strong reference to promise.future;
logError("Promise was garbage collected; this should never happen!", e)
val p = promiseReference.get
if (p != null) {
// Attempt to fail the promise with a Timeout exception
if (!p.tryFailure(e)) {
// If we reach here, then someone else has already signalled success or failure
// on this promise, so log a warning:
logError("Ignore error because promise is completed", e)
}
} else {
// The WeakReference was empty, which should never happen because
// sendMessageReliably's caller should have a strong reference to promise.future;
logError("Promise was garbage collected; this should never happen!", e)
}
})
}
}
}
}
Expand Down

0 comments on commit afcc8d6

Please sign in to comment.