-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2583] ConnectionManager error reporting #1758
Conversation
… from remote from successfully
…-fixes Conflicts: core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
…r error reporting Use Futures to signal failures, rather than exposing empty messages to user code.
Jenkins, test this please. |
QA tests have started for PR 1758. This patch merges cleanly. |
Jenkins, retest this please @JoshRosen it appears something timed out or failed during the tests |
QA tests have started for PR 1758. This patch merges cleanly. |
var ackMessage: Option[Message] = None | ||
var attempted = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll notice that I removed a bunch of fields here. attempted
was never read anywhere, and acked
implied ackMessage != None
.
Josh - does this subsume all of the other open patches for the BlockFetcherIterator and the ConnectionManager? |
It definitely subsumes #1490, but there's three other PRs that I should probably look at and consider absorbing into this one (since I probably will cause merge conflicts for them):
Those last two PRs both aim to address the same issue, SPARK-2677. |
It looks like this caused a test failure! After this patch
I'll walk through the code to try to find any other call sites that lack error-handling code. |
This forces callers to consider error handling.
I decided to change |
QA tests have started for PR 1758. This patch merges cleanly. |
This is when you run the test manually? It looks like the SparkQA passed those tests? Assume its intermittent? |
It actually failed deterministically on Josh's laptop. It could be a bug somewhere in the configuration or a racing condition in Spark. |
You may have already tried this but you might try moving it in the header to be after the securityNeg Int in the message header just to rule that out. I'll try to look at the code and run it here locally later. |
I tried moving the field in the message header, but it didn't help; I still see the same error. diff --git a/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala b/core/src/main/
index f3ecca5..02c2e8c 100644
--- a/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
@@ -42,8 +42,8 @@ private[spark] class MessageChunkHeader(
putInt(totalSize).
putInt(chunkSize).
putInt(other).
- put(if (hasError) 1.asInstanceOf[Byte] else 0.asInstanceOf[Byte]).
putInt(securityNeg).
+ put(if (hasError) 1.asInstanceOf[Byte] else 0.asInstanceOf[Byte]).
putInt(ip.size).
put(ip).
putInt(port).
@@ -69,8 +69,8 @@ private[spark] object MessageChunkHeader {
val totalSize = buffer.getInt()
val chunkSize = buffer.getInt()
val other = buffer.getInt()
- val hasError = buffer.get() != 0
val securityNeg = buffer.getInt()
+ val hasError = buffer.get() != 0
val ipSize = buffer.getInt()
val ipBytes = new Array[Byte](ipSize)
buffer.get(ipBytes) |
I captured some logs from the passing and failing commits: https://gist.github.com/41aeeafddbc4c1770d7b To obtain these, I ran One big difference that jumps out to me is the handler for security negotiation messages. In the failing test case, I see 14/08/06 13:07:01.390 DEBUG ConnectionManager: This is security neg message
14/08/06 13:07:01.391 DEBUG ConnectionManager: Client handleAuth for id: joshs-mbp_58889_1
14/08/06 13:07:01.392 ERROR ConnectionManager: Error handling sasl client authentication
javax.security.sasl.SaslException: DIGEST-MD5: Digest-challenge format violation: algorithm directive missing
at com.sun.security.sasl.digest.DigestMD5Client.processChallenge(DigestMD5Client.java:296)
at com.sun.security.sasl.digest.DigestMD5Client.evaluateChallenge(DigestMD5Client.java:225)
at org.apache.spark.SparkSaslClient.saslResponse(SparkSaslClient.scala:84)
at org.apache.spark.network.ConnectionManager.handleClientAuthentication(ConnectionManager.scala:533)
at org.apache.spark.network.ConnectionManager.handleAuthentication(ConnectionManager.scala:610)
at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:639)
at org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:511)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) while in the passing test case I see 14/08/06 13:09:38.269 DEBUG ConnectionManager: This is security neg message
14/08/06 13:09:38.270 DEBUG ConnectionManager: Server handleAuth for id: joshs-mbp_58931_1
14/08/06 13:09:38.271 DEBUG ConnectionManager: saslContext not established
14/08/06 13:09:38.271 DEBUG ConnectionManager: Creating sasl Server
14/08/06 13:09:38.273 DEBUG ConnectionManager: Server sasl not completed: joshs-mbp_58932_1
14/08/06 13:09:38.273 DEBUG SecurityMessage: message total size is : 150
14/08/06 13:09:38.273 INFO ConnectionManager: creating new sending connection for security! joshs-mbp_58932_2
14/08/06 13:09:38.273 DEBUG SendingConnection: Added [BufferMessage(id = 6, size = 150)] to outbox for sending to [ConnectionManagerId(joshs-mbp,58931)]
14/08/06 13:09:38.273 DEBUG ConnectionManager: Selector selected 0 of 2 keys
14/08/06 13:09:38.273 INFO SendingConnection: Initiating connection to [joshs-mbp/192.168.1.245:58931]
14/08/06 13:09:38.273 DEBUG ConnectionManager: After handleAuth result was true, returning
14/08/06 13:09:38.274 DEBUG ConnectionManager: Handling delay is 6 ms If we only have a single ConnectionManager initiating connections, then shouldn't the logs always show |
yes in this case the test only sends regular messages one way. manager always initiates and managerserver is the receiver. So I would expect to always see server handleAuth first. Looking at your log and I'm trying to run it now locally. Have you tried removing all the other test cases from this test file and reproducing? Just wondering if perhaps another server isn't stopping. |
I re-ran both commits with only the "security on same password" test enabled (by deleting the other tests) and I still see the same issue. I've uploaded another set of logs that only contain results from this single-test run: https://gist.github.com/JoshRosen/ce4f9133bd2366e0b4b7. I edited the logs to omit some of the UI ACL noise at the start of the logs. |
so looking at your first log it looks like its trying to send a message to itself in the failed case and in the good case it sends it to the other connectionManager. Look at the port numbers. good: Failed: |
@@ -79,7 +80,7 @@ class ConnectionManagerSuite extends FunSuite { | |||
|
|||
(0 until count).map(i => { | |||
val bufferMessage = Message.createBufferMessage(buffer.duplicate) | |||
manager.sendMessageReliablySync(managerServer.id, bufferMessage) | |||
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is your bug you aren't sending it to the managerSErver
should be:
Await.ready(manager.sendMessageReliably(managerServer.id, bufferMessage), 10 seconds)
@tgravescs I can't believe I missed that; this is why it's so helpful to have another pair of eyes to review things. Thanks for your help debugging this; I'll submit a fix and then we can continue reviewing this patch and maybe get it into the 1.1 RC. I'm kind of surprised that this didn't break on Jenkins, though. Maybe I'm using a slightly different Java version? |
Yeah I don't know why it wouldn't fail in jenkins. seems like it should fail everywhere, all the time. Maybe we should look at jenkins. I'll keep my building running to see if I can reproduce to make sure that i see it (I'm on rhel box) and let you know what I get. |
@@ -46,7 +47,7 @@ class ConnectionManagerSuite extends FunSuite { | |||
buffer.flip | |||
|
|||
val bufferMessage = Message.createBufferMessage(buffer.duplicate) | |||
manager.sendMessageReliablySync(manager.id, bufferMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is probably why I accidentally replaced managerServer.id
with manager.id
. I think I replaced the now-removed sendMessageReliablySync
here with the Await
then copy-pasted this line's fix into the lines below.
Intercept IOException that's now thrown when a sending connection is closed while there are unacknowledged messages.
Jenkins, retest this please. (Maybe it passed Jenkins because it never ran with the buggy commit. It'll be easier to spot these issues once we update Jenkins to include the commit SHAs in its messages). |
Jenkins, test this please. |
QA tests have started for PR 1758. This patch merges cleanly. |
We now signal failures via Futures that throw exception, not Futures that contain messages with hasError set.
One final fix: I had to update the mock result returned from Jenkins, test this please. |
QA tests have started for PR 1758. This patch merges cleanly. |
QA results for PR 1758: |
Okay great - thanks josh. This only failed a few nondeterministic tests we've been fighting in master. |
This patch modifies the ConnectionManager so that error messages are sent in reply when uncaught exceptions occur during message processing. This prevents message senders from hanging while waiting for an acknowledgment if the remote message processing failed. This is an updated version of sarutak's PR, #1490. The main change is to use Futures / Promises to signal errors. Author: Kousuke Saruta <[email protected]> Author: Josh Rosen <[email protected]> Closes #1758 from JoshRosen/connection-manager-fixes and squashes the following commits: 68620cb [Josh Rosen] Fix test in BlockFetcherIteratorSuite: 83673de [Josh Rosen] Error ACKs should trigger IOExceptions, so catch only those exceptions in the test. b8bb4d4 [Josh Rosen] Fix manager.id vs managerServer.id typo that broke security tests. 659521f [Josh Rosen] Include previous exception when throwing new one a2f745c [Josh Rosen] Remove sendMessageReliablySync; callers can wait themselves. c01c450 [Josh Rosen] Return Try[Message] from sendMessageReliablySync. f1cd1bb [Josh Rosen] Clean up @sarutak's PR #1490 for [SPARK-2583]: ConnectionManager error reporting 7399c6b [Josh Rosen] Merge remote-tracking branch 'origin/pr/1490' into connection-manager-fixes ee91bb7 [Kousuke Saruta] Modified BufferMessage.scala to keep the spark code style 9dfd0d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 e7d9aa6 [Kousuke Saruta] rebase to master 326a17f [Kousuke Saruta] Add test cases to ConnectionManagerSuite.scala for SPARK-2583 2a18d6b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 22d7ebd [Kousuke Saruta] Add test cases to BlockManagerSuite for SPARK-2583 e579302 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 281589c [Kousuke Saruta] Add a test case to BlockFetcherIteratorSuite.scala for fetching block from remote from successfully 0654128 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 ffaa83d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 12d3de8 [Kousuke Saruta] Added BlockFetcherIteratorSuite.scala 4117b8f [Kousuke Saruta] Modified ConnectionManager to be alble to handle error during processing message 717c9c3 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 6635467 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 e2b8c4a [Kousuke Saruta] Modify to propagete error using ConnectionManager (cherry picked from commit 17caae4) Signed-off-by: Patrick Wendell <[email protected]>
This patch modifies the ConnectionManager so that error messages are sent in reply when uncaught exceptions occur during message processing. This prevents message senders from hanging while waiting for an acknowledgment if the remote message processing failed. This is an updated version of sarutak's PR, apache#1490. The main change is to use Futures / Promises to signal errors. Author: Kousuke Saruta <[email protected]> Author: Josh Rosen <[email protected]> Closes apache#1758 from JoshRosen/connection-manager-fixes and squashes the following commits: 68620cb [Josh Rosen] Fix test in BlockFetcherIteratorSuite: 83673de [Josh Rosen] Error ACKs should trigger IOExceptions, so catch only those exceptions in the test. b8bb4d4 [Josh Rosen] Fix manager.id vs managerServer.id typo that broke security tests. 659521f [Josh Rosen] Include previous exception when throwing new one a2f745c [Josh Rosen] Remove sendMessageReliablySync; callers can wait themselves. c01c450 [Josh Rosen] Return Try[Message] from sendMessageReliablySync. f1cd1bb [Josh Rosen] Clean up @sarutak's PR apache#1490 for [SPARK-2583]: ConnectionManager error reporting 7399c6b [Josh Rosen] Merge remote-tracking branch 'origin/pr/1490' into connection-manager-fixes ee91bb7 [Kousuke Saruta] Modified BufferMessage.scala to keep the spark code style 9dfd0d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 e7d9aa6 [Kousuke Saruta] rebase to master 326a17f [Kousuke Saruta] Add test cases to ConnectionManagerSuite.scala for SPARK-2583 2a18d6b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 22d7ebd [Kousuke Saruta] Add test cases to BlockManagerSuite for SPARK-2583 e579302 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 281589c [Kousuke Saruta] Add a test case to BlockFetcherIteratorSuite.scala for fetching block from remote from successfully 0654128 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 ffaa83d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 12d3de8 [Kousuke Saruta] Added BlockFetcherIteratorSuite.scala 4117b8f [Kousuke Saruta] Modified ConnectionManager to be alble to handle error during processing message 717c9c3 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 6635467 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583 e2b8c4a [Kousuke Saruta] Modify to propagete error using ConnectionManager
This patch modifies the ConnectionManager so that error messages are sent in reply when uncaught exceptions occur during message processing. This prevents message senders from hanging while waiting for an acknowledgment if the remote message processing failed.
This is an updated version of @sarutak's PR, #1490. The main change is to use Futures / Promises to signal errors.