Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2677] BasicBlockFetchIterator#next can wait forever #1632

Closed
wants to merge 12 commits into from

Conversation

sarutak
Copy link
Member

@sarutak sarutak commented Jul 29, 2014

No description provided.

throw new SparkException(
"Unexpected message " + blockMessage.getType + " received from " + cmId)

val sendRequestThread = new Thread(s"sendRequestThread(${req.address.host}:${req.address.port})") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New thread affect performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about to control the number of connect in a blockfetch. that can reduce server's connect.

@sarutak
Copy link
Member Author

sarutak commented Aug 10, 2014

In #1758 @JoshRosen fixed ConnectionManager to handle the case remote executor return error message.
But, the case remote executor hangs up is not handled so if remote executor cannot return any message, fetching executor still waits forever.

The latest PR fixes this issue.

@JoshRosen
Copy link
Contributor

Jenkins, test this please.

@@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {

}

test("sendMessageRelyably timeout") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling: should be sendMessageReliably

@SparkQA
Copy link

SparkQA commented Aug 12, 2014

QA tests have started for PR 1632. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18346/consoleFull

@@ -134,6 +136,7 @@ private[spark] class ConnectionManager(
// to be able to track asynchronous messages
private val idCount: AtomicInteger = new AtomicInteger(1)

private var isAckTimeout = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, this seems like per-message state, so it feels wrong to have it as a ConnectionManager-wide variable.

@JoshRosen
Copy link
Contributor

This approach seems very complicated, race-prone, and hard to understand.

Await.ready and Await.result already support timeouts, so why not just add timeout logic at those call sites?

@shivaram
Copy link
Contributor

Another way to solve this is to change the BasicBlockFetcher to use poll with a timeout in LinkedBlockingQueue [1]

[1] http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit)

@SparkQA
Copy link

SparkQA commented Aug 12, 2014

QA results for PR 1632:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18346/consoleFull

@JoshRosen
Copy link
Contributor

@shivaram That's a really good suggestion. I'll try to write a failing unit test that directly uses BasicBlockFetcherIterator so that we can test your approach.

@sarutak
Copy link
Member Author

sarutak commented Aug 12, 2014

Hi @shivaram , @JoshRosen

At first, I had an idea to use poll. I thought it's the easy way.
But, if we use poll and catch TimeoutException, I think, ConnectionManager's state is not reset.
It means, Future object should wait Promise forever.

@sarutak
Copy link
Member Author

sarutak commented Aug 12, 2014

The reason why I din't use Await.ready and Await.result is because those are blocking method. Current way which use onComplete callback is non-blocking.

@sarutak
Copy link
Member Author

sarutak commented Aug 12, 2014

O.K. I'll try to resolve using poll somehow.

@witgo
Copy link
Contributor

witgo commented Aug 12, 2014

I think the current solution is better. LinkedBlockingQueue.poll will bring a lot of problems.

* and finally, FetchFailedException is thrown so in this case, we don't need
* to throw Exception here
*/
if (!isAckTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.

It might be a good idea to logWarning if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

------------------ 原始邮件 ------------------
发件人: "Josh Rosen";[email protected];
发送时间: 2014年8月12日(星期二) 中午12:24
收件人: "apache/spark"[email protected];

主题: Re: [spark] [SPARK-2677] BasicBlockFetchIterator#next can waitforever (#1632)

In core/src/main/scala/org/apache/spark/network/ConnectionManager.scala:

           } >                case None => { > -                throw new Exception("Could not find reference for received ack message " + > -                  message.id) > +                /** > +                 * If isAckTimeout == true, Future returned from sendMessageReliably should fail > +                 * and finally, FetchFailedException is thrown so in this case, we don't need > +                 * to throw Exception here > +                 */ > +                if (!isAckTimeout) {  

Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.

It might be a good idea to logWarning if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.


Reply to this email directly or view it on GitHub.

@JoshRosen
Copy link
Contributor

@sarutak I left updates on a couple of my earlier comments. This solution can work and I have a few suggestions for minor cleanup (e.g. re-using a Timer).

@sarutak
Copy link
Member Author

sarutak commented Aug 12, 2014

@JoshRosen Thanks!
I'll try it.

@@ -72,6 +73,7 @@ private[spark] class ConnectionManager(

// default to 30 second timeout waiting for authentication
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60 is better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Year, maybe right.

@shivaram
Copy link
Contributor

@sarutak You are right that using poll wouldn't clear up the internal state in ConnectionManager. I think @JoshRosen 's idea of using a shared timer pool or re-using some of the existing thread pools (the future execution context ?) might be fine.

@@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.{Timer, TimerTask}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashedWheelTimer performance is better

@SparkQA
Copy link

SparkQA commented Aug 15, 2014

QA tests have started for PR 1632 at commit 66cfff7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 15, 2014

QA tests have started for PR 1632 at commit 7ed48be.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 15, 2014

QA tests have started for PR 1632 at commit 7ed48be.

  • This patch merges cleanly.

messageStatuses.synchronized {
isAckTimeout = true
messageStatuses.remove(message.id).foreach ( s => {
s.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this synchronized needed? Aren't all writes to the promise already guarded by the messageStatuses.synchronized lock?

@sarutak
Copy link
Member Author

sarutak commented Aug 15, 2014

@JoshRosen Exactly, thanks.

@SparkQA
Copy link

SparkQA commented Aug 15, 2014

QA tests have finished for PR 1632 at commit 7ed48be.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1632 at commit e85f88b.

  • This patch merges cleanly.

}
case None => {
if (isAckTimeout) {
logWarning(s"Ack message ${message.id} maybe received after timeout")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be an an else block here so that we throw an exception only if we haven't hit the ack timeout.

This current code looks wrong because it will fall-through and throw an exception if we receive late-arriving messages that we've already timed out on and marked as failures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code (before considering ack timeout) threw Exception when a message which is not referenced is received. So , I decided to throw Exception even if Ack timeout was occurred because we can't distinguish the non-referenced message is caused by ack timeout.

On a second throug, fundamentally, is it needed throwing exception here?
When we receive non-referenced message, should we warn simply right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, it was undoubtedly a bug if we received an ack for a message and didn't have a corresponding SentMessageStatus.

Now that we have timeouts, we have no way to distinguish between a late-arriving ack for a SentMessageStatus that we've already deleted and a bogus ack sent due to buggy code. As I commented upthread, one option would be to simply convert this into a warning. But another option is to keep it as an error unless we've timed out at least once, in which case we treat it as a warning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, the situation receiving non-referenced message is not critical, we should not throw Exception at least so I think log warn is better when receiving non-referenced message even if the message is late arriving ack or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fine, too. Let's just drop the exception for now and remove the isAckTimeout variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.K. I'll remove it.

@JoshRosen
Copy link
Contributor

Actually, I retract my earlier LGTM; this needs a bit of user-facing configuration documentation and I think there's a corner-case bug in how we handle late-arriving ACKs. I'd like for this to make it into the next 1.1.0 preview release, so I may fix these issues myself in a new PR.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1632 at commit e85f88b.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1632 at commit cddbc7b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1632 at commit cddbc7b.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleBlockManager(blockManager: BlockManager,

@JoshRosen
Copy link
Contributor

I've merged this to master and branch-1.1. Thanks!

asfgit pushed a commit that referenced this pull request Aug 16, 2014
Author: Kousuke Saruta <[email protected]>

Closes #1632 from sarutak/SPARK-2677 and squashes the following commits:

cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message
d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout
e85f88b [Kousuke Saruta] Removed useless synchronized blocks
7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide
9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala
7cbb8ca [Kousuke Saruta] Modified to match with scalastyle
8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack
a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala

(cherry picked from commit 76fa0ea)
Signed-off-by: Josh Rosen <[email protected]>
@asfgit asfgit closed this in 76fa0ea Aug 16, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Author: Kousuke Saruta <[email protected]>

Closes apache#1632 from sarutak/SPARK-2677 and squashes the following commits:

cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message
d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout
e85f88b [Kousuke Saruta] Removed useless synchronized blocks
7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide
9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala
7cbb8ca [Kousuke Saruta] Modified to match with scalastyle
8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack
a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677
9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala
@sarutak sarutak deleted the SPARK-2677 branch April 11, 2015 05:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants