-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions #640
Conversation
This patch includes several cleanups to PythonRDD, focused around fixing SPARK-1579 cleanly. Listed in order of importance: - The Python daemon waits for Spark to close the socket before exiting, in order to avoid causing spurious IOExceptions in Spark's PythonRDD::WriterThread. - Removes the Python Monitor Thread, which polled for task cancellations in order to kill the Python worker. Instead, we do this in the onCompleteCallback, since this is guaranteed to be called during cancellation. - Adds a "completed" variable to TaskContext to avoid the issue noted in SPARK-1019, where onCompleteCallbacks may be execution-order dependent. Along with this, I removed the "context.interrupted = true" flag in the onCompleteCallback. - Extracts PythonRDD::WriterThread to its own class. Since this patch provides an alternative solution to SPARK-1019, I did test it with ```sc.textFile("latlon.tsv").take(5)``` many times without error. Additionally, in order to test the unswallowed exceptions, I performed ```sc.textFile("s3n://<big file>").count()``` and cut my internet. Prior to this patch, we got the "stdin writer exited early" message, which is unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (but unsuccessful) task retries.
dataOut.flush() | ||
worker.shutdownOutput() | ||
logWarning("Incomplete task interrupted: Attempting to kill Python Worker") | ||
env.destroyPythonWorker(pythonExec, envVars.toMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahirreddy This logic replaces the monitor thread. Please take a look.
Merged build triggered. |
Merged build started. |
*/ | ||
context.addOnCompleteCallback{ () => | ||
complete = true // Indicate that the task has completed successfully | ||
context.interrupted = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pwendell I removed this line and the comment regarding SPARK-1019 because we now check for task completion in the worker thread before throwing an exception. Please let me know if this fix is insufficient (I'm not really sure why we had to interrupt before?).
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14655/ |
Looks like the Python tests may have timed out; do they work locally? |
Oops, this is actually due to a bug in my reimplementation of Python cancellation. onCompleteCallback doesn't get called if the iterator is stuck reading from Python. I can just revert that part of the patch to continue to use a separate thread, or I can add an "onInterruptCallback" to TaskContext. I will look more into it tomorrow morning. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14668/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14685/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14698/ |
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14702/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14699/ |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14700/ |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14703/ |
# until it raises an exception. | ||
def waitSocketClose(sock): | ||
try: | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per our offline discussion this needs to handle the case where the socket shuts down gracefully.
if sock.recv(4096) == "":
return
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
import java.io._ | ||
import java.util.zip.{GZIPInputStream, GZIPOutputStream} | ||
|
||
import scala.collection.mutable.HashMap | ||
import scala.language.existentials | ||
import scala.util.Try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not used, and you probably didn't mean to move the other import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mm, it was decided (on the dev list) that we'd put Scala language features above all other imports. I guess that hasn't made its way to the code style guide yet...
Removed the Try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
o, so the new order is scala > java > library > spark
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(talked offline, but just in case anyone else was curious, the order is scala.language.* > java > scala > library > spark. The scala language features are made special because they're intended to change the way the compiler behaves, although it's more of a futureproof thing right now.)
Okay I did a quick pass and this looks good. I'm going to pull this in. Thanks @aarondav! |
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance: - The Python daemon waits for Spark to close the socket before exiting, in order to avoid causing spurious IOExceptions in Spark's `PythonRDD::WriterThread`. - Removes the Python Monitor Thread, which polled for task cancellations in order to kill the Python worker. Instead, we do this in the onCompleteCallback, since this is guaranteed to be called during cancellation. - Adds a "completed" variable to TaskContext to avoid the issue noted in [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent. Along with this, I removed the "context.interrupted = true" flag in the onCompleteCallback. - Extracts PythonRDD::WriterThread to its own class. Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with ``` sc.textFile("latlon.tsv").take(5) ``` many times without error. Additionally, in order to test the unswallowed exceptions, I performed ``` sc.textFile("s3n://<big file>").count() ``` and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries. Author: Aaron Davidson <[email protected]> Closes #640 from aarondav/pyspark-io and squashes the following commits: b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions (cherry picked from commit 3308722) Signed-off-by: Patrick Wendell <[email protected]>
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or <[email protected]> Closes #675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor (cherry picked from commit c3f8b78) Signed-off-by: Aaron Davidson <[email protected]>
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or <[email protected]> Closes #675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance: - The Python daemon waits for Spark to close the socket before exiting, in order to avoid causing spurious IOExceptions in Spark's `PythonRDD::WriterThread`. - Removes the Python Monitor Thread, which polled for task cancellations in order to kill the Python worker. Instead, we do this in the onCompleteCallback, since this is guaranteed to be called during cancellation. - Adds a "completed" variable to TaskContext to avoid the issue noted in [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent. Along with this, I removed the "context.interrupted = true" flag in the onCompleteCallback. - Extracts PythonRDD::WriterThread to its own class. Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with ``` sc.textFile("latlon.tsv").take(5) ``` many times without error. Additionally, in order to test the unswallowed exceptions, I performed ``` sc.textFile("s3n://<big file>").count() ``` and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries. Author: Aaron Davidson <[email protected]> Closes apache#640 from aarondav/pyspark-io and squashes the following commits: b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing apache#640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or <[email protected]> Closes apache#675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
…n repartition case (apache#640) ## What changes were proposed in this pull request? Disable using radix sort in ShuffleExchangeExec when we do repartition. In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning. But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem. ### Why are the changes needed? Fix the correctness bug caused by repartition after a shuffle. ### Does this PR introduce any user-facing change? Yes, user will get the right result in the case of repartition stage rerun. ## How was this patch tested? Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000. ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes apache#25491 from xuanyuanking/SPARK-28699-fix. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Co-authored-by: Li Yuanjian <[email protected]>
This patch includes several cleanups to PythonRDD, focused around fixing SPARK-1579 cleanly. Listed in order of approximate importance:
in order to avoid causing spurious IOExceptions in Spark's
PythonRDD::WriterThread
.in order to kill the Python worker. Instead, we do this in the
onCompleteCallback, since this is guaranteed to be called during
cancellation.
SPARK-1019, where onCompleteCallbacks may be execution-order dependent.
Along with this, I removed the "context.interrupted = true" flag in
the onCompleteCallback.
Since this patch provides an alternative solution to SPARK-1019, I did test it with
many times without error.
Additionally, in order to test the unswallowed exceptions, I performed
and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.