Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2583] ConnectionManager error reporting #1758

Closed
wants to merge 23 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch modifies the ConnectionManager so that error messages are sent in reply when uncaught exceptions occur during message processing. This prevents message senders from hanging while waiting for an acknowledgment if the remote message processing failed.

This is an updated version of @sarutak's PR, #1490. The main change is to use Futures / Promises to signal errors.

sarutak and others added 17 commits July 18, 2014 18:01
…-fixes

Conflicts:
	core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
	core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
…r error reporting

Use Futures to signal failures, rather than exposing empty messages to user code.
@JoshRosen
Copy link
Contributor Author

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1758. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17845/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 4, 2014

Jenkins, retest this please @JoshRosen it appears something timed out or failed during the tests

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1758. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17857/consoleFull

var ackMessage: Option[Message] = None
var attempted = false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll notice that I removed a bunch of fields here. attempted was never read anywhere, and acked implied ackMessage != None.

@pwendell
Copy link
Contributor

pwendell commented Aug 4, 2014

Josh - does this subsume all of the other open patches for the BlockFetcherIterator and the ConnectionManager?

@JoshRosen
Copy link
Contributor Author

It definitely subsumes #1490, but there's three other PRs that I should probably look at and consider absorbing into this one (since I probably will cause merge conflicts for them):

Those last two PRs both aim to address the same issue, SPARK-2677.

@JoshRosen
Copy link
Contributor Author

It looks like this caused a test failure! After this patch sendMessagReliablySync throws IOException if the send fails, and the code in the "security mismatch password" test didn't have any error handling for this method, leading to an error:

sendMessageReliably failed without being ACK'd
java.io.IOException: sendMessageReliably failed without being ACK'd
    at org.apache.spark.network.ConnectionManager$$anonfun$13.apply(ConnectionManager.scala:834)
    at org.apache.spark.network.ConnectionManager$$anonfun$13.apply(ConnectionManager.scala:831)
    at org.apache.spark.network.ConnectionManager$MessageStatus.markDone(ConnectionManager.scala:62)
    at org.apache.spark.network.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:448)
    at org.apache.spark.network.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:446)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.network.ConnectionManager.removeConnection(ConnectionManager.scala:446)
    at org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:425)
    at org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:425)
    at org.apache.spark.network.Connection.callOnCloseCallback(Connection.scala:156)
    at org.apache.spark.network.Connection.close(Connection.scala:128)
    at org.apache.spark.network.SendingConnection.read(Connection.scala:392)
    at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:190)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

I'll walk through the code to try to find any other call sites that lack error-handling code.

This forces callers to consider error handling.
@JoshRosen
Copy link
Contributor Author

I decided to change sendMessageReliablySync to return a Try[Message] to ensure that consumers of the reply couldn't ignore errors.

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1758. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17874/consoleFull

@tgravescs
Copy link
Contributor

This is when you run the test manually? It looks like the SparkQA passed those tests? Assume its intermittent?

@rxin
Copy link
Contributor

rxin commented Aug 6, 2014

It actually failed deterministically on Josh's laptop. It could be a bug somewhere in the configuration or a racing condition in Spark.

@tgravescs
Copy link
Contributor

You may have already tried this but you might try moving it in the header to be after the securityNeg Int in the message header just to rule that out. I'll try to look at the code and run it here locally later.

@JoshRosen
Copy link
Contributor Author

I tried moving the field in the message header, but it didn't help; I still see the same error.

diff --git a/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala b/core/src/main/
index f3ecca5..02c2e8c 100644
--- a/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
@@ -42,8 +42,8 @@ private[spark] class MessageChunkHeader(
       putInt(totalSize).
       putInt(chunkSize).
       putInt(other).
-      put(if (hasError) 1.asInstanceOf[Byte] else 0.asInstanceOf[Byte]).
       putInt(securityNeg).
+      put(if (hasError) 1.asInstanceOf[Byte] else 0.asInstanceOf[Byte]).
       putInt(ip.size).
       put(ip).
       putInt(port).
@@ -69,8 +69,8 @@ private[spark] object MessageChunkHeader {
     val totalSize = buffer.getInt()
     val chunkSize = buffer.getInt()
     val other = buffer.getInt()
-    val hasError = buffer.get() != 0
     val securityNeg = buffer.getInt()
+    val hasError = buffer.get() != 0
     val ipSize = buffer.getInt()
     val ipBytes = new Array[Byte](ipSize)
     buffer.get(ipBytes)

@JoshRosen
Copy link
Contributor Author

I captured some logs from the passing and failing commits: https://gist.github.com/41aeeafddbc4c1770d7b

To obtain these, I ran sbt/sbt "core/test:test-only *ConnectionManagerSuite" after modifying core/src/test/resources/log4j.properties to use a DEBUG log level.

One big difference that jumps out to me is the handler for security negotiation messages. In the failing test case, I see

14/08/06 13:07:01.390 DEBUG ConnectionManager: This is security neg message
14/08/06 13:07:01.391 DEBUG ConnectionManager: Client handleAuth for id: joshs-mbp_58889_1
14/08/06 13:07:01.392 ERROR ConnectionManager: Error handling sasl client authentication
javax.security.sasl.SaslException: DIGEST-MD5: Digest-challenge format violation: algorithm directive missing
    at com.sun.security.sasl.digest.DigestMD5Client.processChallenge(DigestMD5Client.java:296)
    at com.sun.security.sasl.digest.DigestMD5Client.evaluateChallenge(DigestMD5Client.java:225)
    at org.apache.spark.SparkSaslClient.saslResponse(SparkSaslClient.scala:84)
    at org.apache.spark.network.ConnectionManager.handleClientAuthentication(ConnectionManager.scala:533)
    at org.apache.spark.network.ConnectionManager.handleAuthentication(ConnectionManager.scala:610)
    at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:639)
    at org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:511)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

while in the passing test case I see

14/08/06 13:09:38.269 DEBUG ConnectionManager: This is security neg message
14/08/06 13:09:38.270 DEBUG ConnectionManager: Server handleAuth for id: joshs-mbp_58931_1
14/08/06 13:09:38.271 DEBUG ConnectionManager: saslContext not established
14/08/06 13:09:38.271 DEBUG ConnectionManager: Creating sasl Server
14/08/06 13:09:38.273 DEBUG ConnectionManager: Server sasl not completed: joshs-mbp_58932_1
14/08/06 13:09:38.273 DEBUG SecurityMessage: message total size is : 150
14/08/06 13:09:38.273 INFO ConnectionManager: creating new sending connection for security! joshs-mbp_58932_2
14/08/06 13:09:38.273 DEBUG SendingConnection: Added [BufferMessage(id = 6, size = 150)] to outbox for sending to [ConnectionManagerId(joshs-mbp,58931)]
14/08/06 13:09:38.273 DEBUG ConnectionManager: Selector selected 0 of 2 keys
14/08/06 13:09:38.273 INFO SendingConnection: Initiating connection to [joshs-mbp/192.168.1.245:58931]
14/08/06 13:09:38.273 DEBUG ConnectionManager: After handleAuth result was true, returning
14/08/06 13:09:38.274 DEBUG ConnectionManager: Handling delay is 6 ms

If we only have a single ConnectionManager initiating connections, then shouldn't the logs always show Server handleAuth for ... followed by Client handleAuth for ...?

@tgravescs
Copy link
Contributor

yes in this case the test only sends regular messages one way. manager always initiates and managerserver is the receiver. So I would expect to always see server handleAuth first. Looking at your log and I'm trying to run it now locally.

Have you tried removing all the other test cases from this test file and reproducing? Just wondering if perhaps another server isn't stopping.

@JoshRosen
Copy link
Contributor Author

I re-ran both commits with only the "security on same password" test enabled (by deleting the other tests) and I still see the same issue. I've uploaded another set of logs that only contain results from this single-test run: https://gist.github.com/JoshRosen/ce4f9133bd2366e0b4b7. I edited the logs to omit some of the UI ACL noise at the start of the logs.

@tgravescs
Copy link
Contributor

so looking at your first log it looks like its trying to send a message to itself in the failed case and in the good case it sends it to the other connectionManager. Look at the port numbers.

good:
Bound socket to port 58931 with id = ConnectionManagerId(joshs-mbp,58931)
14/08/06 13:09:38.141 INFO ConnectionManager: Bound socket to port 58932 with id = ConnectionManagerId(joshs-mbp,58932)
14/08/06 13:09:38.266 DEBUG SecurityMessage: message total size is : 42
14/08/06 13:09:38.266 DEBUG SendingConnection: Added [BufferMessage(id = 5, size = 42)] to outbox for sending to [ConnectionManagerId(joshs-mbp,58932)]

Failed:
ager: Bound socket to port 58889 with id = ConnectionManagerId(joshs-mbp,58889)
14/08/06 13:07:01.260 INFO ConnectionManager: Bound socket to port 58890 with id = ConnectionManagerId(joshs-mbp,58890)
14/08/06 13:07:01.386 DEBUG SecurityMessage: message total size is : 42
14/08/06 13:07:01.387 DEBUG SendingConnection: Added [BufferMessage(id = 5, size = 42)] to outbox for sending to [ConnectionManagerId(joshs-mbp,58889)]

@@ -79,7 +80,7 @@ class ConnectionManagerSuite extends FunSuite {

(0 until count).map(i => {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliablySync(managerServer.id, bufferMessage)
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is your bug you aren't sending it to the managerSErver

should be:
Await.ready(manager.sendMessageReliably(managerServer.id, bufferMessage), 10 seconds)

@JoshRosen
Copy link
Contributor Author

@tgravescs I can't believe I missed that; this is why it's so helpful to have another pair of eyes to review things. Thanks for your help debugging this; I'll submit a fix and then we can continue reviewing this patch and maybe get it into the 1.1 RC.

I'm kind of surprised that this didn't break on Jenkins, though. Maybe I'm using a slightly different Java version?

@tgravescs
Copy link
Contributor

Yeah I don't know why it wouldn't fail in jenkins. seems like it should fail everywhere, all the time. Maybe we should look at jenkins. I'll keep my building running to see if I can reproduce to make sure that i see it (I'm on rhel box) and let you know what I get.

@@ -46,7 +47,7 @@ class ConnectionManagerSuite extends FunSuite {
buffer.flip

val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliablySync(manager.id, bufferMessage)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is probably why I accidentally replaced managerServer.id with manager.id. I think I replaced the now-removed sendMessageReliablySync here with the Await then copy-pasted this line's fix into the lines below.

Intercept IOException that's now thrown when a sending connection is closed
while there are unacknowledged messages.
@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

(Maybe it passed Jenkins because it never ran with the buggy commit. It'll be easier to spot these issues once we update Jenkins to include the commit SHAs in its messages).

@JoshRosen
Copy link
Contributor Author

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1758. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18062/consoleFull

We now signal failures via Futures that throw exception, not Futures that
contain messages with hasError set.
@JoshRosen
Copy link
Contributor Author

One final fix: I had to update the mock result returned from sendMessageReliably in the BlockFetcherIteratorSuite tests, since we now signal failure via a Future that returns an error, rather than a Future that returns a Message with hasError=true.

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1758. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18075/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 7, 2014

QA results for PR 1758:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18075/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 7, 2014

Okay great - thanks josh. This only failed a few nondeterministic tests we've been fighting in master.

asfgit pushed a commit that referenced this pull request Aug 7, 2014
This patch modifies the ConnectionManager so that error messages are sent in reply when uncaught exceptions occur during message processing.  This prevents message senders from hanging while waiting for an acknowledgment if the remote message processing failed.

This is an updated version of sarutak's PR, #1490.  The main change is to use Futures / Promises to signal errors.

Author: Kousuke Saruta <[email protected]>
Author: Josh Rosen <[email protected]>

Closes #1758 from JoshRosen/connection-manager-fixes and squashes the following commits:

68620cb [Josh Rosen] Fix test in BlockFetcherIteratorSuite:
83673de [Josh Rosen] Error ACKs should trigger IOExceptions, so catch only those exceptions in the test.
b8bb4d4 [Josh Rosen] Fix manager.id vs managerServer.id typo that broke security tests.
659521f [Josh Rosen] Include previous exception when throwing new one
a2f745c [Josh Rosen] Remove sendMessageReliablySync; callers can wait themselves.
c01c450 [Josh Rosen] Return Try[Message] from sendMessageReliablySync.
f1cd1bb [Josh Rosen] Clean up @sarutak's PR #1490 for [SPARK-2583]: ConnectionManager error reporting
7399c6b [Josh Rosen] Merge remote-tracking branch 'origin/pr/1490' into connection-manager-fixes
ee91bb7 [Kousuke Saruta] Modified BufferMessage.scala to keep the spark code style
9dfd0d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
e7d9aa6 [Kousuke Saruta] rebase to master
326a17f [Kousuke Saruta] Add test cases to ConnectionManagerSuite.scala for SPARK-2583
2a18d6b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
22d7ebd [Kousuke Saruta] Add test cases to BlockManagerSuite for SPARK-2583
e579302 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
281589c [Kousuke Saruta] Add a test case to BlockFetcherIteratorSuite.scala for fetching block from remote from successfully
0654128 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
ffaa83d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
12d3de8 [Kousuke Saruta] Added BlockFetcherIteratorSuite.scala
4117b8f [Kousuke Saruta] Modified ConnectionManager to be alble to handle error during processing message
717c9c3 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
6635467 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
e2b8c4a [Kousuke Saruta] Modify to propagete error using ConnectionManager
(cherry picked from commit 17caae4)

Signed-off-by: Patrick Wendell <[email protected]>
@asfgit asfgit closed this in 17caae4 Aug 7, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This patch modifies the ConnectionManager so that error messages are sent in reply when uncaught exceptions occur during message processing.  This prevents message senders from hanging while waiting for an acknowledgment if the remote message processing failed.

This is an updated version of sarutak's PR, apache#1490.  The main change is to use Futures / Promises to signal errors.

Author: Kousuke Saruta <[email protected]>
Author: Josh Rosen <[email protected]>

Closes apache#1758 from JoshRosen/connection-manager-fixes and squashes the following commits:

68620cb [Josh Rosen] Fix test in BlockFetcherIteratorSuite:
83673de [Josh Rosen] Error ACKs should trigger IOExceptions, so catch only those exceptions in the test.
b8bb4d4 [Josh Rosen] Fix manager.id vs managerServer.id typo that broke security tests.
659521f [Josh Rosen] Include previous exception when throwing new one
a2f745c [Josh Rosen] Remove sendMessageReliablySync; callers can wait themselves.
c01c450 [Josh Rosen] Return Try[Message] from sendMessageReliablySync.
f1cd1bb [Josh Rosen] Clean up @sarutak's PR apache#1490 for [SPARK-2583]: ConnectionManager error reporting
7399c6b [Josh Rosen] Merge remote-tracking branch 'origin/pr/1490' into connection-manager-fixes
ee91bb7 [Kousuke Saruta] Modified BufferMessage.scala to keep the spark code style
9dfd0d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
e7d9aa6 [Kousuke Saruta] rebase to master
326a17f [Kousuke Saruta] Add test cases to ConnectionManagerSuite.scala for SPARK-2583
2a18d6b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
22d7ebd [Kousuke Saruta] Add test cases to BlockManagerSuite for SPARK-2583
e579302 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
281589c [Kousuke Saruta] Add a test case to BlockFetcherIteratorSuite.scala for fetching block from remote from successfully
0654128 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
ffaa83d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
12d3de8 [Kousuke Saruta] Added BlockFetcherIteratorSuite.scala
4117b8f [Kousuke Saruta] Modified ConnectionManager to be alble to handle error during processing message
717c9c3 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
6635467 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
e2b8c4a [Kousuke Saruta] Modify to propagete error using ConnectionManager
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants