-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23040][CORE]: Returns interruptible iterator for shuffle reader #20449
Conversation
Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified.
ping @cloud-fan |
context.addTaskCompletionListener(tc => { | ||
// Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in | ||
// CompletionIterator. Another way would be making sorter.stop idempotent. | ||
if (tc.isInterrupted()) { sorter.stop() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we can remove this if
if we don't return a CompletionIterator
.
BTW I think we need to check all the places that use CompletionIterator
, to see if they consider job canceling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One advantage of CompletionIterator
is that the completionFunction
will be called as soon as the wrapped iterator is consumed. So for sorter, it will release memory earlier rather than at task completion.
As for job cancelling, It's not just CompletionIterator
that we should consider. The combiner and sorter pattern(or similar) is something we should look for:
combiner.insertAll(iterator) // or sorter.insertAll(iterator)
// then returns new iterator
combiner.iterator // or sorter.iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing something obvious, but seems ExternalSorter.stop()
is already idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing something obvious, but seems ExternalSorter.stop() is already idempotent?
Ah, yes. After another look, it's indeed idempotent.
Will update the code.
cc @jiangxb1987 |
ping @cloud-fan and @jiangxb1987. |
@advancedxy did you see any issue or exception regarding to this issue? |
Hi, @jerryshao I didn't see exception. But the issue is: |
I understood your intention. I was wondering do we actually meet this issue in production envs, or do you have a minimal reproduce code? |
@jerryshao I met this issue in our production when I was debugging a Spark job. I noticed the aborted stage's task continues running until finishes. I cannot give a minimal reproduce code since the failure is related to our mixed(online and offline services) hosts. But you can have a look at the test case I added, it essentially captures the transformation I used except the async part. Currently, I wrap the user defined iterator under Iterruptible Iterator. However I believe it's better handled on Spark side. |
I see. Thanks. |
.mapPartitions { iter => | ||
taskStartedSemaphore.release() | ||
// Small delay to ensure that foreach is cancelled if task is killed | ||
Thread.sleep(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using sleep
will make the UT flaky, you'd better changing to some deterministic ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@jerryshao @cloud-fan I have updated my code. Do you have any other concerns? |
@@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( | |||
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) | |||
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) | |||
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) | |||
// Use completion callback to stop sorter if task was cancelled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if task is completed(either finished or canceled)
14c9dc1
to
ddeffd8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sorry for the delay.
Your comment is addressed.
@@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( | |||
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) | |||
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) | |||
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) | |||
// Use completion callback to stop sorter if task was completed(either finished/cancelled). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fit the 100 chars limitation, or
is replaced by /
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we can just write if task was finished/cancelled.
import org.scalatest.BeforeAndAfter | ||
import org.scalatest.Matchers | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will break the style check
taskStartedSemaphore.release() | ||
iter | ||
}.foreachAsync { x => | ||
if ( x._1 >= 10) { // this block of code is partially executed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no space after if(
@@ -320,6 +319,55 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft | |||
f2.get() | |||
} | |||
|
|||
test("Interruptible iterator of shuffle reader") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we briefly explain what happened in this test?
@cloud-fan I have update the comments and fixed style issues(previously was auto formatted by IntelliJ) |
ok to test |
val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } | ||
.repartitionAndSortWithinPartitions(new HashPartitioner(2)) | ||
.mapPartitions { iter => | ||
taskStartedSemaphore.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be called twice as the root RDD has 2 partitions, so f.cancel
might be called before both of these 2 partitions finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f.cancel()
should be called before these partitions(tasks) finishing , and we want to make sure these tasks could be cancelled
val taskCompletedSem = new Semaphore(0) | ||
Future { | ||
taskStartedSemaphore.acquire() | ||
f.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the expectation for when this f.cancel()
should be called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 372: sem.acquire()
is blocked by this Future block
, but it looks we don't need Future
or sem
here. I will update the code.
Test build #87703 has finished for PR 20449 at commit
|
Test build #87701 has finished for PR 20449 at commit
|
taskStartedSemaphore.acquire() | ||
f.cancel() | ||
|
||
val e = intercept[SparkException] { f.get() }.getCause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: intercept[SparkException](f.get()).getCause
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
}) | ||
|
||
taskStartedSemaphore.acquire() | ||
f.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some comment to explain when we reach here. From what I am seeing:
taskStartedSemaphore.release()
must be called, so at least one task is started.- the first task has processed no more than 10 records, the second task hasn't processed any data, because the reduce stage is not finished and
taskCancelledSemaphore.acquire()
will be blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
} | ||
}) | ||
|
||
taskStartedSemaphore.acquire() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not taskStartedSemaphore.acquire(numSlice)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As soon as one task starts, we can cancel the job.
Test build #87723 has finished for PR 20449 at commit
|
Test build #87725 has finished for PR 20449 at commit
|
// execution and a counter is used to make sure that the corresponding tasks are indeed | ||
// cancelled. | ||
import JobCancellationSuite._ | ||
val numSlice = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we hardcode it? using a variable makes people feel like they can change its value and the test can still pass, however it's not true as assert(executionOfInterruptibleCounter.get() <= 10)
needs to be updated too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update it later.
But looks like Jenkins are having troubles there days? it it back to normal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, let's just try it :)
LGTM |
retest this please |
Test build #87756 has finished for PR 20449 at commit
|
tests Move addSparkListener to beginning of test
All right, I finally tracked down why it's hanging on Jenkins. Please check the latest change, @cloud-fan |
Test build #87813 has finished for PR 20449 at commit
|
// Reset semaphores if used by multiple tests. | ||
// Note: if other semaphores are shared by multiple tests, please reset them in this block | ||
JobCancellationSuite.taskStartedSemaphore.drainPermits() | ||
JobCancellationSuite.taskCancelledSemaphore.drainPermits() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for simplicity, I'd like to reset all semaphores here, instead of thinking about which one are shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we can make all semaphores local, so that we don't need to care about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for simplicity, I'd like to reset all semaphores here, instead of thinking about which one are shared.
Another way to avoid this problem is: don't reuse semaphores. But that's too verbose.
As for your suggestion, if new semaphores are added by others, how could he know that he's supposed to reset the semaphores? Maybe some comments are needed in semaphore declaration
or we can make all semaphores local, so that we don't need to care about it.
No, Global semaphore is required when being shared between driver and executor(another thread in local mode).
See related pr #4180 for details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some comments are needed in semaphore declaration
+1. It's also good for reviewers, otherwise figuring out a semaphore is shared or not is really unnecessary for reviewers.
Test build #87822 has finished for PR 20449 at commit
|
Test build #87845 has finished for PR 20449 at commit
|
ping @cloud-fan |
thanks, merging to master! |
@cloud-fan is it possible that we also merge this into branch-2.3, so this fix could be released in the Spark-2.3.1? |
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) | ||
case None => | ||
aggregatedIter | ||
} | ||
// Use another interruptible iterator here to support task cancellation as aggregator or(and) | ||
// sorter may have consumed previous interruptible iterator. | ||
new InterruptibleIterator[Product2[K, C]](context, resultIter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a chance that resultIter
is already an InterruptibleIterator
, and we should not double wrap it. Can you send a followup PR to fix this? then we can backport them to 2.3 together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
## What changes were proposed in this pull request? Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it. ## How was this patch tested? Existing tests. Author: Xingbo Jiang <[email protected]> Closes apache#20920 from jiangxb1987/SPARK-23040.
## What changes were proposed in this pull request? Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified. This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting. ## How was this patch tested? Add a unit test in JobCancellationSuite Author: Xianjin YE <[email protected]> Closes apache#20449 from advancedxy/SPARK-23040.
## What changes were proposed in this pull request? Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it. ## How was this patch tested? Existing tests. Author: Xingbo Jiang <[email protected]> Closes apache#20920 from jiangxb1987/SPARK-23040.
…fle reader Backport #20449 and #20920 to branch-2.3 --- ## What changes were proposed in this pull request? Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified. This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting. ## How was this patch tested? Add a unit test in JobCancellationSuite Author: Xianjin YE <[email protected]> Author: Xingbo Jiang <[email protected]> Closes #20954 from jiangxb1987/SPARK-23040-2.3.
## What changes were proposed in this pull request? Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it. ## How was this patch tested? Existing tests. Author: Xingbo Jiang <[email protected]> Closes apache#20920 from jiangxb1987/SPARK-23040.
What changes were proposed in this pull request?
Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified.
This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting.
How was this patch tested?
Add a unit test in JobCancellationSuite