Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions #640

Closed
wants to merge 2 commits into from

Conversation

aarondav
Copy link
Contributor

@aarondav aarondav commented May 5, 2014

This patch includes several cleanups to PythonRDD, focused around fixing SPARK-1579 cleanly. Listed in order of approximate importance:

  • The Python daemon waits for Spark to close the socket before exiting,
    in order to avoid causing spurious IOExceptions in Spark's
    PythonRDD::WriterThread.
  • Removes the Python Monitor Thread, which polled for task cancellations
    in order to kill the Python worker. Instead, we do this in the
    onCompleteCallback, since this is guaranteed to be called during
    cancellation.
  • Adds a "completed" variable to TaskContext to avoid the issue noted in
    SPARK-1019, where onCompleteCallbacks may be execution-order dependent.
    Along with this, I removed the "context.interrupted = true" flag in
    the onCompleteCallback.
  • Extracts PythonRDD::WriterThread to its own class.

Since this patch provides an alternative solution to SPARK-1019, I did test it with

sc.textFile("latlon.tsv").take(5)

many times without error.

Additionally, in order to test the unswallowed exceptions, I performed

sc.textFile("s3n://<big file>").count()

and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.

This patch includes several cleanups to PythonRDD, focused around
fixing SPARK-1579 cleanly. Listed in order of importance:

- The Python daemon waits for Spark to close the socket before exiting,
  in order to avoid causing spurious IOExceptions in Spark's
  PythonRDD::WriterThread.

- Removes the Python Monitor Thread, which polled for task cancellations
  in order to kill the Python worker. Instead, we do this in the
  onCompleteCallback, since this is guaranteed to be called during
  cancellation.

- Adds a "completed" variable to TaskContext to avoid the issue noted in
  SPARK-1019, where onCompleteCallbacks may be execution-order dependent.
  Along with this, I removed the "context.interrupted = true" flag in
  the onCompleteCallback.

- Extracts PythonRDD::WriterThread to its own class.

Since this patch provides an alternative solution to SPARK-1019, I did
test it with

```sc.textFile("latlon.tsv").take(5)```

many times without error.

Additionally, in order to test the unswallowed exceptions, I performed

```sc.textFile("s3n://<big file>").count()```

and cut my internet. Prior to this patch, we got the "stdin writer
exited early" message, which is unhelpful. Now, we get the
SocketExceptions propagated through Spark to the user and get
proper (but unsuccessful) task retries.
dataOut.flush()
worker.shutdownOutput()
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
env.destroyPythonWorker(pythonExec, envVars.toMap)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahirreddy This logic replaces the monitor thread. Please take a look.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

*/
context.addOnCompleteCallback{ () =>
complete = true // Indicate that the task has completed successfully
context.interrupted = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell I removed this line and the comment regarding SPARK-1019 because we now check for task completion in the worker thread before throwing an exception. Please let me know if this fix is insufficient (I'm not really sure why we had to interrupt before?).

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14655/

@mateiz
Copy link
Contributor

mateiz commented May 5, 2014

Looks like the Python tests may have timed out; do they work locally?

@aarondav
Copy link
Contributor Author

aarondav commented May 5, 2014

Oops, this is actually due to a bug in my reimplementation of Python cancellation. onCompleteCallback doesn't get called if the iterator is stuck reading from Python. I can just revert that part of the patch to continue to use a separate thread, or I can add an "onInterruptCallback" to TaskContext. I will look more into it tomorrow morning.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14668/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14685/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14698/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14702/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14699/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14700/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14703/

# until it raises an exception.
def waitSocketClose(sock):
try:
while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our offline discussion this needs to handle the case where the socket shuts down gracefully.

  if sock.recv(4096) == "":
    return

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14739/

import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.HashMap
import scala.language.existentials
import scala.util.Try
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not used, and you probably didn't mean to move the other import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mm, it was decided (on the dev list) that we'd put Scala language features above all other imports. I guess that hasn't made its way to the code style guide yet...

Removed the Try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o, so the new order is scala > java > library > spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(talked offline, but just in case anyone else was curious, the order is scala.language.* > java > scala > library > spark. The scala language features are made special because they're intended to change the way the compiler behaves, although it's more of a futureproof thing right now.)

@pwendell
Copy link
Contributor

pwendell commented May 7, 2014

Okay I did a quick pass and this looks good. I'm going to pull this in. Thanks @aarondav!

@asfgit asfgit closed this in 3308722 May 7, 2014
asfgit pushed a commit that referenced this pull request May 7, 2014
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance:

- The Python daemon waits for Spark to close the socket before exiting,
  in order to avoid causing spurious IOExceptions in Spark's
  `PythonRDD::WriterThread`.
- Removes the Python Monitor Thread, which polled for task cancellations
  in order to kill the Python worker. Instead, we do this in the
  onCompleteCallback, since this is guaranteed to be called during
  cancellation.
- Adds a "completed" variable to TaskContext to avoid the issue noted in
  [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent.
  Along with this, I removed the "context.interrupted = true" flag in
  the onCompleteCallback.
- Extracts PythonRDD::WriterThread to its own class.

Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with

```
sc.textFile("latlon.tsv").take(5)
```

many times without error.

Additionally, in order to test the unswallowed exceptions, I performed

```
sc.textFile("s3n://<big file>").count()
```

and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.

Author: Aaron Davidson <[email protected]>

Closes #640 from aarondav/pyspark-io and squashes the following commits:

b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket
c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
(cherry picked from commit 3308722)

Signed-off-by: Patrick Wendell <[email protected]>
asfgit pushed a commit that referenced this pull request May 8, 2014
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction.

This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget.

Author: Andrew Or <[email protected]>

Closes #675 from andrewor14/task-context and squashes the following commits:

9575e02 [Andrew Or] Add space
69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context
c471490 [Andrew Or] Oops, removed one flag too many. Adding it back.
85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor

(cherry picked from commit c3f8b78)
Signed-off-by: Aaron Davidson <[email protected]>
asfgit pushed a commit that referenced this pull request May 8, 2014
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction.

This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget.

Author: Andrew Or <[email protected]>

Closes #675 from andrewor14/task-context and squashes the following commits:

9575e02 [Andrew Or] Add space
69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context
c471490 [Andrew Or] Oops, removed one flag too many. Adding it back.
85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance:

- The Python daemon waits for Spark to close the socket before exiting,
  in order to avoid causing spurious IOExceptions in Spark's
  `PythonRDD::WriterThread`.
- Removes the Python Monitor Thread, which polled for task cancellations
  in order to kill the Python worker. Instead, we do this in the
  onCompleteCallback, since this is guaranteed to be called during
  cancellation.
- Adds a "completed" variable to TaskContext to avoid the issue noted in
  [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent.
  Along with this, I removed the "context.interrupted = true" flag in
  the onCompleteCallback.
- Extracts PythonRDD::WriterThread to its own class.

Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with

```
sc.textFile("latlon.tsv").take(5)
```

many times without error.

Additionally, in order to test the unswallowed exceptions, I performed

```
sc.textFile("s3n://<big file>").count()
```

and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.

Author: Aaron Davidson <[email protected]>

Closes apache#640 from aarondav/pyspark-io and squashes the following commits:

b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket
c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction.

This was inspired by reviewing apache#640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget.

Author: Andrew Or <[email protected]>

Closes apache#675 from andrewor14/task-context and squashes the following commits:

9575e02 [Andrew Or] Add space
69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context
c471490 [Andrew Or] Oops, removed one flag too many. Adding it back.
85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
rshkv pushed a commit to rshkv/spark that referenced this pull request Feb 27, 2020
…n repartition case (apache#640)

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Co-authored-by: Li Yuanjian <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants