Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues #4230

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 44 additions & 43 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,10 @@ private[spark] object Utils extends Logging {
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xf", fileName), targetDir)
executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
Expand Down Expand Up @@ -956,25 +956,25 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a command in the given working directory, throwing an exception if it completes
* with an exit code other than 0.
* Execute a command and return the process running the command.
*/
def execute(command: Seq[String], workingDir: File) {
val process = new ProcessBuilder(command: _*)
.directory(workingDir)
.redirectErrorStream(true)
.start()
new Thread("read stdout for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
System.err.println(line)
}
}
}.start()
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkException("Process " + command + " exited with code " + exitCode)
def executeCommand(
command: Seq[String],
workingDir: File = new File("."),
extraEnvironment: Map[String, String] = Map.empty,
redirectStderr: Boolean = true): Process = {
val builder = new ProcessBuilder(command: _*).directory(workingDir)
val environment = builder.environment()
for ((key, value) <- extraEnvironment) {
environment.put(key, value)
}
val process = builder.start()
if (redirectStderr) {
val threadName = "redirect stderr for command " + command(0)
def log(s: String): Unit = logInfo(s)
processStreamByLine(threadName, process.getErrorStream, log)
}
process
}

/**
Expand All @@ -983,31 +983,13 @@ private[spark] object Utils extends Logging {
def executeAndGetOutput(
command: Seq[String],
workingDir: File = new File("."),
extraEnvironment: Map[String, String] = Map.empty): String = {
val builder = new ProcessBuilder(command: _*)
.directory(workingDir)
val environment = builder.environment()
for ((key, value) <- extraEnvironment) {
environment.put(key, value)
}

val process = builder.start()
new Thread("read stderr for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
logInfo(line)
}
}
}.start()
extraEnvironment: Map[String, String] = Map.empty,
redirectStderr: Boolean = true): String = {
val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
val output = new StringBuffer
val stdoutThread = new Thread("read stdout for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
output.append(line)
}
}
}
stdoutThread.start()
val threadName = "read stdout for " + command(0)
def appendToOutput(s: String): Unit = output.append(s)
val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
if (exitCode != 0) {
Expand All @@ -1017,6 +999,25 @@ private[spark] object Utils extends Logging {
output.toString
}

/**
* Return and start a daemon thread that processes the content of the input stream line by line.
*/
def processStreamByLine(
threadName: String,
inputStream: InputStream,
processLine: String => Unit): Thread = {
val t = new Thread(threadName) {
override def run() {
for (line <- Source.fromInputStream(inputStream).getLines()) {
processLine(line)
}
}
}
t.setDaemon(true)
t.start()
t
}

/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
Expand Down
25 changes: 12 additions & 13 deletions core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,30 @@ import org.apache.spark.util.Utils

class DriverSuite extends FunSuite with Timeouts {

// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're touching this, I kinda like the approach of adding the bug number to the test() call that I've seen in other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this line, but I can move it there too

test("driver should exit after finishing") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
val masters = Table("master", "local", "local-cluster[2,1,512]")
forAll(masters) { (master: String) =>
failAfter(60 seconds) {
Utils.executeAndGetOutput(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 shouldn't the destroy be in a finally block? Otherwise the process isn't destroyed if you hit the timeout.

(I know this was committed a while back, and the test is even ignored now -- but I was looking at some flaky tests and this just happened to catch my eye, so I was curious)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it would be good to fix it, though I don't think it will solve the flakiness that we currently encounter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way I'm addressing this at #6886

}
}
}

/**
* Program that creates a Spark driver but doesn't call SparkContext.stop() or
* Sys.exit() after finishing.
* Program that creates a Spark driver but doesn't call SparkContext#stop() or
* sys.exit() after finishing.
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
Utils.configTestLog4j("INFO")
// Bind the web UI to an ephemeral port in order to avoid conflicts with other tests running on
// the same machine (we shouldn't just disable the UI here, since that might mask bugs):
val conf = new SparkConf().set("spark.ui.port", "0")
val conf = new SparkConf
val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
sc.parallelize(1 to 100, 4).count()
}
Expand Down
26 changes: 15 additions & 11 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,36 @@ import java.io._

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.{ResetSystemProperties, Utils}
import org.scalatest.FunSuite
import org.scalatest.Matchers

// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
// of properties that neeed to be cleared after tests.
class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties with Timeouts {
def beforeAll() {
System.setProperty("spark.testing", "true")
}

val noOpOutputStream = new OutputStream {
private val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
}

/** Simple PrintStream that reads data into a buffer */
class BufferPrintStream extends PrintStream(noOpOutputStream) {
private class BufferPrintStream extends PrintStream(noOpOutputStream) {
var lineBuffer = ArrayBuffer[String]()
override def println(line: String) {
lineBuffer += line
}
}

/** Returns true if the script exits and the given search string is printed. */
def testPrematureExit(input: Array[String], searchString: String) = {
private def testPrematureExit(input: Array[String], searchString: String) = {
val printStream = new BufferPrintStream()
SparkSubmit.printStream = printStream

Expand Down Expand Up @@ -290,7 +293,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"--conf", "spark.ui.enabled=false",
unusedJar.toString)
runSparkSubmit(args)
}
Expand All @@ -305,7 +307,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--jars", jarsString,
"--conf", "spark.ui.enabled=false",
unusedJar.toString)
runSparkSubmit(args)
}
Expand Down Expand Up @@ -430,15 +431,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
}

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Utils.executeAndGetOutput(
val process = Utils.executeCommand(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}

def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
val tmpDir = Utils.createTempDir()

val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
Expand Down