Skip to content

Commit

Permalink
Fix PySpark: actually kill driver on termination
Browse files Browse the repository at this point in the history
We use the stdin broken pipe as a signal that the parent process
that launched the SparkSubmitDriverBootstrapper JVM has exited.
This is very similar to what Py4J's JavaGateway does (see the
parameter "--die-on-broken-pipe"). This allows both JVMs to
actually terminate after the application has finished.

This was especially relevant for the PySpark shell, where Spark
submit itself is launched as a python subprocess and the driver
was never actually killed even after the shell had exited.
  • Loading branch information
andrewor14 committed Aug 20, 2014
1 parent d0f20db commit 9a778f6
Showing 1 changed file with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.util.{RedirectThread, Utils}
* driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
* of parsing the properties file for such relevant configs in Bash.
*
* Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <application args>
* Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
*/
private[spark] object SparkSubmitDriverBootstrapper {

Expand Down Expand Up @@ -116,13 +116,23 @@ private[spark] object SparkSubmitDriverBootstrapper {
System.err.println("========================================\n")
}

// Start the driver JVM
val filteredCommand = command.filter(_.nonEmpty)
val builder = new ProcessBuilder(filteredCommand)
val process = builder.start()
new RedirectThread(System.in, process.getOutputStream, "redirect stdin").start()
new RedirectThread(process.getInputStream, System.out, "redirect stdout").start()
new RedirectThread(process.getErrorStream, System.err, "redirect stderr").start()
System.exit(process.waitFor())

// Redirect stdin, stdout, and stderr to/from the child JVM
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdinThread.start()
stdoutThread.start()
stderrThread.start()

// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
stdinThread.join()
process.destroy()
}

}

0 comments on commit 9a778f6

Please sign in to comment.