-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24755][Core] Executor loss can cause task to not be resubmitted #21729
Conversation
@@ -87,7 +87,7 @@ private[spark] class TaskSetManager( | |||
// Set the coresponding index of Boolean var when the task killed by other attempt tasks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo I made before, coresponding -> corresponding.
("exec2", "host2"), ("exec3", "host3")) | ||
sched.initialize(new FakeSchedulerBackend() { | ||
override def killTask(taskId: Long, | ||
executorId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
var resubmittedTasks = new mutable.HashSet[Int] | ||
val dagScheduler = new FakeDAGScheduler(sc, sched) { | ||
override def taskEnded(task: Task[_], | ||
reason: TaskEndReason, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some nits in code changes. The added UT copy a lot of code with SPARK-22074, is there any better way to reuse them or combine the two test together?
Please change the title to '[SPARK-24755][Core] Executor loss can cause task to not be resubmitted' |
@xuanyuanking Thanks for the comments. I also thought about modifying the UT of SPARK-22074 instead of adding new UT but I was afraid it might cause confusing since they are 2 different issues although they are very close. If you feel it is better to combine them, I can change it. Thanks. |
ok to test |
Test build #92775 has finished for PR 21729 at commit
|
@@ -87,7 +87,7 @@ private[spark] class TaskSetManager( | |||
// Set the coresponding index of Boolean var when the task killed by other attempt tasks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment needs to be changed since no longer array with boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update it. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update comment based on hashset being ok now
cc @mridulm |
@@ -87,7 +87,7 @@ private[spark] class TaskSetManager( | |||
// Set the coresponding index of Boolean var when the task killed by other attempt tasks, | |||
// this happened while we set the `spark.speculation` to true. The task killed by others | |||
// should not resubmit while executor lost. | |||
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks) | |||
private val killedByOtherAttempt = new HashSet[Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: I prefer an Array[Long]
, so you know the index corresponding to the taskId, that can provide more information while debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jiangxb1987, thanks for the comment, but I'm not sure if I understand your suggestion correctly. Do you mean: private val killedByOtherAttempt = new Array[Long] ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the comment "Set the corresponding index of Boolean var when the task killed ..." is not correct anymore. I'm sorry I forgot to update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, please also update the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use ArrayBuffer[Long] instead of Array[Long] because the number of elements can grow when there are more killed tasks.
Also, I think there is a downside of using Array-like data structure for this variable. Lookup operation for array-like data structure takes linear time and that operation is used many times when we check if a task need to be resubmitted (inside executorLost method of TSM). This will not matter much if the size of the array is small, but still I think this is something we might want to consider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm 's approach also sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 please clarify is it fine as is or are you wanting to use a hashMap and track the index? Can you give an example when this is used for debugging? For instance are you getting a heap dump and looking at the datastructures that might make sense, otherwise its not accessible without you adding in further log statements anyway and its just extra memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance when you have corrupted shuffle data you may want to ensure it's not caused by killing tasks, and that requires track all killed taskId
s corresponding to a partition. With a hashMap as @mridulm proposed it shall be easy to add extra log to debug. But actually I just looked at the code again and found that expanding the logInfo in L735 can also resolve my case. So it seems fine to use hashSet to save some memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not proposing to expand the logInfo in L735 in this PR, I'm just concern about whether it's convenient enough for me to add extra logs to debug a potential issue. Since there is another way to achieve the same effect, I'm okay with using hashSet here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great thanks, seems like we can go with code as is then.
@hthuynh2 please update based on the comments above. You can leave the type as Hashset and fix the other typos, identations, and comments. |
Test build #93122 has finished for PR 21729 at commit
|
Test build #93121 has finished for PR 21729 at commit
|
Test build #93127 has finished for PR 21729 at commit
|
test this please |
Test build #93176 has finished for PR 21729 at commit
|
Looks good to me, thanks for fixing this @hthuynh2 ! |
lgtm |
Test build #93248 has finished for PR 21729 at commit
|
Test build #93249 has finished for PR 21729 at commit
|
+1 I'm going to merge, thanks @hthuynh2 |
**Description** As described in [SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted. This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost. **How was this patch tested?** A UT is added based on the UT written by xuanyuanking with modification to simulate the scenario described in SPARK-24755. Author: Hieu Huynh <“[email protected]”> Closes #21729 from hthuynh2/SPARK_24755. (cherry picked from commit 8d707b0) Signed-off-by: Thomas Graves <[email protected]>
For other reviewers, this is merged to master/2.3 |
**Description** As described in [SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted. This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost. **How was this patch tested?** A UT is added based on the UT written by xuanyuanking with modification to simulate the scenario described in SPARK-24755. Author: Hieu Huynh <“[email protected]”> Closes apache#21729 from hthuynh2/SPARK_24755. (cherry-picked from commit 8d707b0) Ref: LIHADOOP-40171 RB=1414249 BUG=LIHADOOP-40171 R=fli,mshen,yezhou A=yezhou
Description
As described in SPARK-24755, when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost.
How was this patch tested?
A UT is added based on the UT written by @xuanyuanking with modification to simulate the scenario described in SPARK-24755.