-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2583] ConnectionManager error reporting #1758
Closed
Closed
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
e2b8c4a
Modify to propagete error using ConnectionManager
sarutak 6635467
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak 717c9c3
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak 4117b8f
Modified ConnectionManager to be alble to handle error during process…
sarutak 12d3de8
Added BlockFetcherIteratorSuite.scala
sarutak ffaa83d
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak 0654128
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak 281589c
Add a test case to BlockFetcherIteratorSuite.scala for fetching block…
sarutak e579302
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak 22d7ebd
Add test cases to BlockManagerSuite for SPARK-2583
sarutak 2a18d6b
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak 326a17f
Add test cases to ConnectionManagerSuite.scala for SPARK-2583
sarutak e7d9aa6
rebase to master
sarutak 9dfd0d8
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
sarutak ee91bb7
Modified BufferMessage.scala to keep the spark code style
sarutak 7399c6b
Merge remote-tracking branch 'origin/pr/1490' into connection-manager…
JoshRosen f1cd1bb
Clean up @sarutak's PR #1490 for [SPARK-2583]: ConnectionManager erro…
JoshRosen c01c450
Return Try[Message] from sendMessageReliablySync.
JoshRosen a2f745c
Remove sendMessageReliablySync; callers can wait themselves.
JoshRosen 659521f
Include previous exception when throwing new one
JoshRosen b8bb4d4
Fix manager.id vs managerServer.id typo that broke security tests.
JoshRosen 83673de
Error ACKs should trigger IOExceptions, so catch only those exception…
JoshRosen 68620cb
Fix test in BlockFetcherIteratorSuite:
JoshRosen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.network | ||
|
||
import java.io.IOException | ||
import java.nio._ | ||
import java.nio.channels._ | ||
import java.nio.channels.spi._ | ||
|
@@ -41,16 +42,26 @@ import org.apache.spark.util.{SystemClock, Utils} | |
private[spark] class ConnectionManager(port: Int, conf: SparkConf, | ||
securityManager: SecurityManager) extends Logging { | ||
|
||
/** | ||
* Used by sendMessageReliably to track messages being sent. | ||
* @param message the message that was sent | ||
* @param connectionManagerId the connection manager that sent this message | ||
* @param completionHandler callback that's invoked when the send has completed or failed | ||
*/ | ||
class MessageStatus( | ||
val message: Message, | ||
val connectionManagerId: ConnectionManagerId, | ||
completionHandler: MessageStatus => Unit) { | ||
|
||
/** This is non-None if message has been ack'd */ | ||
var ackMessage: Option[Message] = None | ||
var attempted = false | ||
var acked = false | ||
|
||
def markDone() { completionHandler(this) } | ||
def markDone(ackMessage: Option[Message]) { | ||
this.synchronized { | ||
this.ackMessage = ackMessage | ||
completionHandler(this) | ||
} | ||
} | ||
} | ||
|
||
private val selector = SelectorProvider.provider.openSelector() | ||
|
@@ -434,11 +445,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId) | ||
.foreach(status => { | ||
logInfo("Notifying " + status) | ||
status.synchronized { | ||
status.attempted = true | ||
status.acked = false | ||
status.markDone() | ||
} | ||
status.markDone(None) | ||
}) | ||
|
||
messageStatuses.retain((i, status) => { | ||
|
@@ -467,11 +474,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
for (s <- messageStatuses.values | ||
if s.connectionManagerId == sendingConnectionManagerId) { | ||
logInfo("Notifying " + s) | ||
s.synchronized { | ||
s.attempted = true | ||
s.acked = false | ||
s.markDone() | ||
} | ||
s.markDone(None) | ||
} | ||
|
||
messageStatuses.retain((i, status) => { | ||
|
@@ -539,13 +542,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
val securityMsgResp = SecurityMessage.fromResponse(replyToken, | ||
securityMsg.getConnectionId.toString) | ||
val message = securityMsgResp.toBufferMessage | ||
if (message == null) throw new Exception("Error creating security message") | ||
if (message == null) throw new IOException("Error creating security message") | ||
sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message) | ||
} catch { | ||
case e: Exception => { | ||
logError("Error handling sasl client authentication", e) | ||
waitingConn.close() | ||
throw new Exception("Error evaluating sasl response: " + e) | ||
throw new IOException("Error evaluating sasl response: ", e) | ||
} | ||
} | ||
} | ||
|
@@ -653,34 +656,39 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
} | ||
} | ||
} | ||
sentMessageStatus.synchronized { | ||
sentMessageStatus.ackMessage = Some(message) | ||
sentMessageStatus.attempted = true | ||
sentMessageStatus.acked = true | ||
sentMessageStatus.markDone() | ||
} | ||
sentMessageStatus.markDone(Some(message)) | ||
} else { | ||
val ackMessage = if (onReceiveCallback != null) { | ||
logDebug("Calling back") | ||
onReceiveCallback(bufferMessage, connectionManagerId) | ||
} else { | ||
logDebug("Not calling back as callback is null") | ||
None | ||
} | ||
var ackMessage : Option[Message] = None | ||
try { | ||
ackMessage = if (onReceiveCallback != null) { | ||
logDebug("Calling back") | ||
onReceiveCallback(bufferMessage, connectionManagerId) | ||
} else { | ||
logDebug("Not calling back as callback is null") | ||
None | ||
} | ||
|
||
if (ackMessage.isDefined) { | ||
if (!ackMessage.get.isInstanceOf[BufferMessage]) { | ||
logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " | ||
+ ackMessage.get.getClass) | ||
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) { | ||
logDebug("Response to " + bufferMessage + " does not have ack id set") | ||
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id | ||
if (ackMessage.isDefined) { | ||
if (!ackMessage.get.isInstanceOf[BufferMessage]) { | ||
logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " | ||
+ ackMessage.get.getClass) | ||
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) { | ||
logDebug("Response to " + bufferMessage + " does not have ack id set") | ||
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id | ||
} | ||
} | ||
} catch { | ||
case e: Exception => { | ||
logError(s"Exception was thrown while processing message", e) | ||
val m = Message.createBufferMessage(bufferMessage.id) | ||
m.hasError = true | ||
ackMessage = Some(m) | ||
} | ||
} finally { | ||
sendMessage(connectionManagerId, ackMessage.getOrElse { | ||
Message.createBufferMessage(bufferMessage.id) | ||
}) | ||
} | ||
|
||
sendMessage(connectionManagerId, ackMessage.getOrElse { | ||
Message.createBufferMessage(bufferMessage.id) | ||
}) | ||
} | ||
} | ||
case _ => throw new Exception("Unknown type message received") | ||
|
@@ -792,11 +800,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
case Some(msgStatus) => { | ||
messageStatuses -= message.id | ||
logInfo("Notifying " + msgStatus.connectionManagerId) | ||
msgStatus.synchronized { | ||
msgStatus.attempted = true | ||
msgStatus.acked = false | ||
msgStatus.markDone() | ||
} | ||
msgStatus.markDone(None) | ||
} | ||
case None => { | ||
logError("no messageStatus for failed message id: " + message.id) | ||
|
@@ -815,23 +819,35 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
selector.wakeup() | ||
} | ||
|
||
/** | ||
* Send a message and block until an acknowldgment is received or an error occurs. | ||
* @param connectionManagerId the message's destination | ||
* @param message the message being sent | ||
* @return a Future that either returns the acknowledgment message or captures an exception. | ||
*/ | ||
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message) | ||
: Future[Option[Message]] = { | ||
val promise = Promise[Option[Message]] | ||
val status = new MessageStatus( | ||
message, connectionManagerId, s => promise.success(s.ackMessage)) | ||
: Future[Message] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we now signal failures via Futures, I changed this into |
||
val promise = Promise[Message]() | ||
val status = new MessageStatus(message, connectionManagerId, s => { | ||
s.ackMessage match { | ||
case None => // Indicates a failure where we either never sent or never got ACK'd | ||
promise.failure(new IOException("sendMessageReliably failed without being ACK'd")) | ||
case Some(ackMessage) => | ||
if (ackMessage.hasError) { | ||
promise.failure( | ||
new IOException("sendMessageReliably failed with ACK that signalled a remote error")) | ||
} else { | ||
promise.success(ackMessage) | ||
} | ||
} | ||
}) | ||
messageStatuses.synchronized { | ||
messageStatuses += ((message.id, status)) | ||
} | ||
sendMessage(connectionManagerId, message) | ||
promise.future | ||
} | ||
|
||
def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, | ||
message: Message): Option[Message] = { | ||
Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf) | ||
} | ||
|
||
def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) { | ||
onReceiveCallback = callback | ||
} | ||
|
@@ -854,6 +870,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, | |
|
||
|
||
private[spark] object ConnectionManager { | ||
import ExecutionContext.Implicits.global | ||
|
||
def main(args: Array[String]) { | ||
val conf = new SparkConf | ||
|
@@ -888,7 +905,7 @@ private[spark] object ConnectionManager { | |
|
||
(0 until count).map(i => { | ||
val bufferMessage = Message.createBufferMessage(buffer.duplicate) | ||
manager.sendMessageReliablySync(manager.id, bufferMessage) | ||
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), Duration.Inf) | ||
}) | ||
println("--------------------------") | ||
println() | ||
|
@@ -909,8 +926,10 @@ private[spark] object ConnectionManager { | |
val bufferMessage = Message.createBufferMessage(buffer.duplicate) | ||
manager.sendMessageReliably(manager.id, bufferMessage) | ||
}).foreach(f => { | ||
val g = Await.result(f, 1 second) | ||
if (!g.isDefined) println("Failed") | ||
f.onFailure { | ||
case e => println("Failed due to " + e) | ||
} | ||
Await.ready(f, 1 second) | ||
}) | ||
val finishTime = System.currentTimeMillis | ||
|
||
|
@@ -944,8 +963,10 @@ private[spark] object ConnectionManager { | |
val bufferMessage = Message.createBufferMessage(buffers(count - 1 - i).duplicate) | ||
manager.sendMessageReliably(manager.id, bufferMessage) | ||
}).foreach(f => { | ||
val g = Await.result(f, 1 second) | ||
if (!g.isDefined) println("Failed") | ||
f.onFailure { | ||
case e => println("Failed due to " + e) | ||
} | ||
Await.ready(f, 1 second) | ||
}) | ||
val finishTime = System.currentTimeMillis | ||
|
||
|
@@ -974,8 +995,10 @@ private[spark] object ConnectionManager { | |
val bufferMessage = Message.createBufferMessage(buffer.duplicate) | ||
manager.sendMessageReliably(manager.id, bufferMessage) | ||
}).foreach(f => { | ||
val g = Await.result(f, 1 second) | ||
if (!g.isDefined) println("Failed") | ||
f.onFailure { | ||
case e => println("Failed due to " + e) | ||
} | ||
Await.ready(f, 1 second) | ||
}) | ||
val finishTime = System.currentTimeMillis | ||
Thread.sleep(1000) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll notice that I removed a bunch of fields here.
attempted
was never read anywhere, andacked
impliedackMessage != None
.