-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend #43021
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this @xiongbo-sjtu, nice catch !
+CC @jiangxb1987 as well
Thanks for the fix, is it possible to add an unit test to cover the failure? |
f24da16
to
4e9ae1e
Compare
4e9ae1e
to
fd607e9
Compare
@jiangxb1987 Good suggestion! I've updated the unit test accordingly. Please merge this fix to master. If possible, please help patch v3.3.1 and above. Thanks! |
fd607e9
to
f706307
Compare
Eventually got all tests passed in Github Actions. Any concern on merging this pull request? As a side note, I've discovered another minor issue, but will address that in another pull request. Thanks, |
The updated test is not checking for behavior (it is checking for the type of the variable, so imo not very useful), not sure how we can reliably trigger this behavior to test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updated test is not checking for behavior (it is checking for the type of the variable, so imo not very useful), not sure how we can reliably trigger this behavior to test.
This strikes me as a situation where the ROI of a reproducing test is somewhat low: any reproducer would necessarily involve multiple threads performing different operations (reads and writes) and in order to reliably reproduce the old bug we'd have to either get lucky on the interleaving or would need to run with really high concurrency in order to get enough attempts so that we have a high likelihood of reproduction.
I think it's unlikely that we'll regress on this specific code (e.g. accidentally changing it from a concurrent map to a non-concurrent one). An end-to-end reproduction might help to defend against other thread-safety bugs here, but I don't view this code as inherently more vulnerable than other multi-threaded places in Spark so I don't think it's worth it to add dedicated tests there.
Given this, I would personally be okay with foregoing any added tests for this specific fix. WDYT?
+1 to the comment made by @JoshRosen Our production fleet has tens of thousands Spark jobs running daily. After migrating those jobs from Spark 2 to Spark 3, we've observed the issue (i.e., thread dumps show that the dispatcher thread is stuck with the reported stack trace) only a handful of times. The unit test was added so that the thread-safety will not be unintentionally refactored away later. It's not meant to simulate the race condition. The bottom line is that no functional regression is expected after this pull request replaces the vanilla mutable.hashmap with CHM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// Ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227) | ||
assert(backend.taskResources.isInstanceOf[ | ||
ConcurrentMap[Long, Map[String, ResourceInformation]] | ||
]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do you want to drop this assert
@xiongbo-sjtu ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above assert
logic was added so that the thread-safe data structure (i.e., CHM) will not be unintentionally refactored away later. That's why I think that it's better for us to keep the logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a comment on the field about thread safety, and remove this assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the spirit of "disagree and commit", I've removed the assert statement and updated the pull request.
Please merge this fix to master. If possible, please help patch v3.3.1 and above. We have been waiting for the fix to get merged, before pulling the trigger to sync up our internal repo with the open-source repo.
I think it would be good to add the content of SPARK-45227 here to the PR's description. |
Gentlemen, please 👍 if you're okay with getting this pull request merged. |
…ere an executor process randomly gets stuck
f706307
to
d696d38
Compare
@HeartSaVioR, @HyukjinKwon I am seeing multiple PR's failing with this. It looks like some timeout issue, did anything change recently which might be triggering it ? This PR should not be impacted by those modules, but I would have prefer clean builds before merging. |
All failed tests have passed after I rerun them. Solving the fragile unit tests is an orthogonal concern. Any more concern or blocker for merging this pull request? @mridulm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for fixing this @xiongbo-sjtu !
…edExecutorBackend ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ****************************************************************************** **_Please feel free to skip reading unless you're interested in details_** ****************************************************************************** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.put(HashMap.scala:126) at scala.collection.mutable.HashMap.update(HashMap.scala:131) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` ### Relevant code paths Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively. ### What's going on? Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety - Thread 1 sees A.next = B, but then yields execution to Thread 2 <img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400"> - Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1. <img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400"> - After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view. Closes #43021 from xiongbo-sjtu/master. Authored-by: Bo Xiong <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 8e6b160) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…edExecutorBackend ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ****************************************************************************** **_Please feel free to skip reading unless you're interested in details_** ****************************************************************************** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.put(HashMap.scala:126) at scala.collection.mutable.HashMap.update(HashMap.scala:131) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` ### Relevant code paths Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively. ### What's going on? Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety - Thread 1 sees A.next = B, but then yields execution to Thread 2 <img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400"> - Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1. <img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400"> - After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view. Closes #43021 from xiongbo-sjtu/master. Authored-by: Bo Xiong <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 8e6b160) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Merged to master, 3.5 and 3.4 Thanks for the reviews @JoshRosen, @jiangxb1987, @attilapiros :-) |
Thanks. Just submit another pull request to patch branch-3.3 |
…edExecutorBackend Backport of #43021 to branch 3.3 ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck. ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #43176 from xiongbo-sjtu/branch-3.3. Authored-by: Bo Xiong <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…edExecutorBackend ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ****************************************************************************** **_Please feel free to skip reading unless you're interested in details_** ****************************************************************************** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" apache#40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.put(HashMap.scala:126) at scala.collection.mutable.HashMap.update(HashMap.scala:131) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` ### Relevant code paths Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively. ### What's going on? Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety - Thread 1 sees A.next = B, but then yields execution to Thread 2 <img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400"> - Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1. <img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400"> - After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view. Closes apache#43021 from xiongbo-sjtu/master. Authored-by: Bo Xiong <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 8e6b160) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> (cherry picked from commit e706ba1)
What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck
Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No
Please feel free to skip reading unless you're interested in details
Symptom
Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling
df.write
. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.Below is what's observed from relevant container logs and thread dump.
Relevant code paths
Within an executor process, there's a dispatcher thread dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in Executor and CoarseGrainedExecutorBackend, respectively.
What's going on?
Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety