-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2677] BasicBlockFetchIterator#next can wait forever #1632
Conversation
throw new SparkException( | ||
"Unexpected message " + blockMessage.getType + " received from " + cmId) | ||
|
||
val sendRequestThread = new Thread(s"sendRequestThread(${req.address.host}:${req.address.port})") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New thread affect performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about to control the number of connect in a blockfetch. that can reduce server's connect.
In #1758 @JoshRosen fixed ConnectionManager to handle the case remote executor return error message. The latest PR fixes this issue. |
Jenkins, test this please. |
@@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite { | |||
|
|||
} | |||
|
|||
test("sendMessageRelyably timeout") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling: should be sendMessageReliably
QA tests have started for PR 1632. This patch merges cleanly. |
@@ -134,6 +136,7 @@ private[spark] class ConnectionManager( | |||
// to be able to track asynchronous messages | |||
private val idCount: AtomicInteger = new AtomicInteger(1) | |||
|
|||
private var isAckTimeout = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically, this seems like per-message state, so it feels wrong to have it as a ConnectionManager-wide variable.
This approach seems very complicated, race-prone, and hard to understand.
|
Another way to solve this is to change the BasicBlockFetcher to use |
QA results for PR 1632: |
@shivaram That's a really good suggestion. I'll try to write a failing unit test that directly uses BasicBlockFetcherIterator so that we can test your approach. |
Hi @shivaram , @JoshRosen At first, I had an idea to use poll. I thought it's the easy way. |
The reason why I din't use Await.ready and Await.result is because those are blocking method. Current way which use onComplete callback is non-blocking. |
O.K. I'll try to resolve using poll somehow. |
I think the current solution is better. |
* and finally, FetchFailedException is thrown so in this case, we don't need | ||
* to throw Exception here | ||
*/ | ||
if (!isAckTimeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout
is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.
It might be a good idea to logWarning
if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
------------------ 原始邮件 ------------------
发件人: "Josh Rosen";[email protected];
发送时间: 2014年8月12日(星期二) 中午12:24
收件人: "apache/spark"[email protected];
主题: Re: [spark] [SPARK-2677] BasicBlockFetchIterator#next can waitforever (#1632)
In core/src/main/scala/org/apache/spark/network/ConnectionManager.scala:
} > case None => { > - throw new Exception("Could not find reference for received ack message " + > - message.id) > + /** > + * If isAckTimeout == true, Future returned from sendMessageReliably should fail > + * and finally, FetchFailedException is thrown so in this case, we don't need > + * to throw Exception here > + */ > + if (!isAckTimeout) {
Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.
It might be a good idea to logWarning if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.
—
Reply to this email directly or view it on GitHub.
@sarutak I left updates on a couple of my earlier comments. This solution can work and I have a few suggestions for minor cleanup (e.g. re-using a Timer). |
@JoshRosen Thanks! |
@@ -72,6 +73,7 @@ private[spark] class ConnectionManager( | |||
|
|||
// default to 30 second timeout waiting for authentication | |||
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30) | |||
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
60
is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Year, maybe right.
@sarutak You are right that using poll wouldn't clear up the internal state in ConnectionManager. I think @JoshRosen 's idea of using a shared timer pool or re-using some of the existing thread pools (the future execution context ?) might be fine. |
@@ -22,6 +22,7 @@ import java.nio._ | |||
import java.nio.channels._ | |||
import java.nio.channels.spi._ | |||
import java.net._ | |||
import java.util.{Timer, TimerTask} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashedWheelTimer performance is better
QA tests have started for PR 1632 at commit
|
QA tests have started for PR 1632 at commit
|
Jenkins, retest this please. |
QA tests have started for PR 1632 at commit
|
messageStatuses.synchronized { | ||
isAckTimeout = true | ||
messageStatuses.remove(message.id).foreach ( s => { | ||
s.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this synchronized
needed? Aren't all writes to the promise already guarded by the messageStatuses.synchronized
lock?
@JoshRosen Exactly, thanks. |
QA tests have finished for PR 1632 at commit
|
Jenkins, retest this please. |
QA tests have started for PR 1632 at commit
|
} | ||
case None => { | ||
if (isAckTimeout) { | ||
logWarning(s"Ack message ${message.id} maybe received after timeout") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be an an else
block here so that we throw an exception only if we haven't hit the ack timeout.
This current code looks wrong because it will fall-through and throw an exception if we receive late-arriving messages that we've already timed out on and marked as failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code (before considering ack timeout) threw Exception when a message which is not referenced is received. So , I decided to throw Exception even if Ack timeout was occurred because we can't distinguish the non-referenced message is caused by ack timeout.
On a second throug, fundamentally, is it needed throwing exception here?
When we receive non-referenced message, should we warn simply right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, it was undoubtedly a bug if we received an ack for a message and didn't have a corresponding SentMessageStatus.
Now that we have timeouts, we have no way to distinguish between a late-arriving ack for a SentMessageStatus that we've already deleted and a bogus ack sent due to buggy code. As I commented upthread, one option would be to simply convert this into a warning. But another option is to keep it as an error unless we've timed out at least once, in which case we treat it as a warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, the situation receiving non-referenced message is not critical, we should not throw Exception at least so I think log warn is better when receiving non-referenced message even if the message is late arriving ack or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's fine, too. Let's just drop the exception for now and remove the isAckTimeout
variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
O.K. I'll remove it.
Actually, I retract my earlier LGTM; this needs a bit of user-facing configuration documentation and I think there's a corner-case bug in how we handle late-arriving ACKs. I'd like for this to make it into the next 1.1.0 preview release, so I may fix these issues myself in a new PR. |
QA tests have finished for PR 1632 at commit
|
…ves ack for non-referenced message
QA tests have started for PR 1632 at commit
|
QA tests have finished for PR 1632 at commit
|
I've merged this to |
Author: Kousuke Saruta <[email protected]> Closes #1632 from sarutak/SPARK-2677 and squashes the following commits: cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout e85f88b [Kousuke Saruta] Removed useless synchronized blocks 7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide 9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala 7cbb8ca [Kousuke Saruta] Modified to match with scalastyle 8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala (cherry picked from commit 76fa0ea) Signed-off-by: Josh Rosen <[email protected]>
Author: Kousuke Saruta <[email protected]> Closes apache#1632 from sarutak/SPARK-2677 and squashes the following commits: cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout e85f88b [Kousuke Saruta] Removed useless synchronized blocks 7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide 9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala 7cbb8ca [Kousuke Saruta] Modified to match with scalastyle 8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala
No description provided.