Skip to content

Commit

Permalink
[SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues
Browse files Browse the repository at this point in the history
In DriverSuite, we currently set a timeout of 60 seconds. If after this time the process has not terminated, we leak the process because we never destroy it.

In SparkSubmitSuite, we currently do not have a timeout so the test can hang indefinitely.

Author: Andrew Or <[email protected]>

Closes #4230 from andrewor14/fix-driver-suite and squashes the following commits:

f5c80fd [Andrew Or] Fix timeout behaviors in both suites
8092c36 [Andrew Or] Stop SparkContext after every individual test
  • Loading branch information
Andrew Or committed Jan 28, 2015
1 parent 81f8f34 commit 84b6ecd
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 68 deletions.
87 changes: 44 additions & 43 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,10 @@ private[spark] object Utils extends Logging {
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xf", fileName), targetDir)
executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
Expand Down Expand Up @@ -956,25 +956,25 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a command in the given working directory, throwing an exception if it completes
* with an exit code other than 0.
* Execute a command and return the process running the command.
*/
def execute(command: Seq[String], workingDir: File) {
val process = new ProcessBuilder(command: _*)
.directory(workingDir)
.redirectErrorStream(true)
.start()
new Thread("read stdout for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
System.err.println(line)
}
}
}.start()
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkException("Process " + command + " exited with code " + exitCode)
def executeCommand(
command: Seq[String],
workingDir: File = new File("."),
extraEnvironment: Map[String, String] = Map.empty,
redirectStderr: Boolean = true): Process = {
val builder = new ProcessBuilder(command: _*).directory(workingDir)
val environment = builder.environment()
for ((key, value) <- extraEnvironment) {
environment.put(key, value)
}
val process = builder.start()
if (redirectStderr) {
val threadName = "redirect stderr for command " + command(0)
def log(s: String): Unit = logInfo(s)
processStreamByLine(threadName, process.getErrorStream, log)
}
process
}

/**
Expand All @@ -983,31 +983,13 @@ private[spark] object Utils extends Logging {
def executeAndGetOutput(
command: Seq[String],
workingDir: File = new File("."),
extraEnvironment: Map[String, String] = Map.empty): String = {
val builder = new ProcessBuilder(command: _*)
.directory(workingDir)
val environment = builder.environment()
for ((key, value) <- extraEnvironment) {
environment.put(key, value)
}

val process = builder.start()
new Thread("read stderr for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
logInfo(line)
}
}
}.start()
extraEnvironment: Map[String, String] = Map.empty,
redirectStderr: Boolean = true): String = {
val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
val output = new StringBuffer
val stdoutThread = new Thread("read stdout for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
output.append(line)
}
}
}
stdoutThread.start()
val threadName = "read stdout for " + command(0)
def appendToOutput(s: String): Unit = output.append(s)
val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
if (exitCode != 0) {
Expand All @@ -1017,6 +999,25 @@ private[spark] object Utils extends Logging {
output.toString
}

/**
* Return and start a daemon thread that processes the content of the input stream line by line.
*/
def processStreamByLine(
threadName: String,
inputStream: InputStream,
processLine: String => Unit): Thread = {
val t = new Thread(threadName) {
override def run() {
for (line <- Source.fromInputStream(inputStream).getLines()) {
processLine(line)
}
}
}
t.setDaemon(true)
t.start()
t
}

/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
Expand Down
26 changes: 12 additions & 14 deletions core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,29 @@ import org.apache.spark.util.Utils

class DriverSuite extends FunSuite with Timeouts {

test("driver should exit after finishing") {
test("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
val masters = Table("master", "local", "local-cluster[2,1,512]")
forAll(masters) { (master: String) =>
failAfter(60 seconds) {
Utils.executeAndGetOutput(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}
}
}

/**
* Program that creates a Spark driver but doesn't call SparkContext.stop() or
* Sys.exit() after finishing.
* Program that creates a Spark driver but doesn't call SparkContext#stop() or
* sys.exit() after finishing.
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
Utils.configTestLog4j("INFO")
// Bind the web UI to an ephemeral port in order to avoid conflicts with other tests running on
// the same machine (we shouldn't just disable the UI here, since that might mask bugs):
val conf = new SparkConf().set("spark.ui.port", "0")
val conf = new SparkConf
val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
sc.parallelize(1 to 100, 4).count()
}
Expand Down
26 changes: 15 additions & 11 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,36 @@ import java.io._

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.{ResetSystemProperties, Utils}
import org.scalatest.FunSuite
import org.scalatest.Matchers

// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
// of properties that neeed to be cleared after tests.
class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties with Timeouts {
def beforeAll() {
System.setProperty("spark.testing", "true")
}

val noOpOutputStream = new OutputStream {
private val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
}

/** Simple PrintStream that reads data into a buffer */
class BufferPrintStream extends PrintStream(noOpOutputStream) {
private class BufferPrintStream extends PrintStream(noOpOutputStream) {
var lineBuffer = ArrayBuffer[String]()
override def println(line: String) {
lineBuffer += line
}
}

/** Returns true if the script exits and the given search string is printed. */
def testPrematureExit(input: Array[String], searchString: String) = {
private def testPrematureExit(input: Array[String], searchString: String) = {
val printStream = new BufferPrintStream()
SparkSubmit.printStream = printStream

Expand Down Expand Up @@ -290,7 +293,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"--conf", "spark.ui.enabled=false",
unusedJar.toString)
runSparkSubmit(args)
}
Expand All @@ -305,7 +307,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--jars", jarsString,
"--conf", "spark.ui.enabled=false",
unusedJar.toString)
runSparkSubmit(args)
}
Expand Down Expand Up @@ -430,15 +431,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
}

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Utils.executeAndGetOutput(
val process = Utils.executeCommand(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}

def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
val tmpDir = Utils.createTempDir()

val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
Expand Down

0 comments on commit 84b6ecd

Please sign in to comment.