From dd0287cca9ca6d8333f5397b36b0017e92a3fbf3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 23 Dec 2014 16:02:59 -0800 Subject: [PATCH] [SPARK-4606] Send EOF to child JVM when there's no more data to read. Author: Marcelo Vanzin Closes #3460 from vanzin/SPARK-4606 and squashes the following commits: 031207d [Marcelo Vanzin] [SPARK-4606] Send EOF to child JVM when there's no more data to read. (cherry picked from commit 7e2deb71c4239564631b19c748e95c3d1aa1c77d) Signed-off-by: Josh Rosen --- .../SparkSubmitDriverBootstrapper.scala | 3 ++- .../scala/org/apache/spark/util/Utils.scala | 24 +++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index a64170a47bc1c..40304ed64a1a9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -144,7 +144,8 @@ private[spark] object SparkSubmitDriverBootstrapper { val isWindows = Utils.isWindows val isPySparkShell = sys.env.contains("PYSPARK_SHELL") if (!isWindows) { - val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") + val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin", + propagateEof = true) stdinThread.start() // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM // should terminate on broken pipe, which signals that the parent process has exited. In diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c11b2a89cb220..4259145ec93d1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1604,19 +1604,29 @@ private[spark] object Utils extends Logging { /** * A utility class to redirect the child process's stdout or stderr. */ -private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String) +private[spark] class RedirectThread( + in: InputStream, + out: OutputStream, + name: String, + propagateEof: Boolean = false) extends Thread(name) { setDaemon(true) override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. - val buf = new Array[Byte](1024) - var len = in.read(buf) - while (len != -1) { - out.write(buf, 0, len) - out.flush() - len = in.read(buf) + try { + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } finally { + if (propagateEof) { + out.close() + } } } }