Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4606] Send EOF to child JVM when there's no more data to read. #3460

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
val isWindows = Utils.isWindows
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of nitpicky, but I'd probably explicitly name the boolean parameter here (IntelliJ complains otherwise, and besides, it's a nice practice). I can fix this up myself on merge, though.

stdinThread.start()
// Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
// broken pipe, signaling that the parent process has exited. This is the case if the
Expand Down
24 changes: 17 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1792,19 +1792,29 @@ private[spark] object Utils extends Logging {
/**
* A utility class to redirect the child process's stdout or stderr.
*/
private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
private[spark] class RedirectThread(
in: InputStream,
out: OutputStream,
name: String,
propagateEof: Boolean = false)
extends Thread(name) {

setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
try {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
}
} finally {
if (propagateEof) {
out.close()
}
}
}
}
Expand Down