Skip to content

Commit

Permalink
[SQL] Fixes race condition in CliSuite
Browse files Browse the repository at this point in the history
`CliSuite` has been flaky for a while, this PR tries to improve this situation by fixing a race condition in `CliSuite`. The `captureOutput` function is used to capture both stdout and stderr output of the forked external process in two background threads and search for expected strings, but wasn't been properly synchronized before.

Author: Cheng Lian <[email protected]>

Closes apache#3060 from liancheng/fix-cli-suite and squashes the following commits:

a70569c [Cheng Lian] Fixes race condition in CliSuite
  • Loading branch information
liancheng committed Nov 6, 2014
1 parent 0dd28ec commit 7f395b7
Showing 1 changed file with 15 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@

package org.apache.spark.sql.hive.thriftserver

import java.io._

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io._
import java.util.concurrent.atomic.AtomicInteger

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand All @@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}

// AtomicInteger is needed because stderr and stdout of the forked process are handled in
// different threads.
val next = new AtomicInteger(0)
var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object

def captureOutput(source: String)(line: String) {
def captureOutput(source: String)(line: String): Unit = lock.synchronized {
buffer += s"$source> $line"
// If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
// If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
// If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
// If we haven't found all expected answers and another expected answer comes up...
if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
next += 1
// If all expected answers have been found...
if (next == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
}
}
Expand All @@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
|
|Executed query ${next.get()} "${queries(next.get())}",
|But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
|Executed query $next "${queries(next)}",
|But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
|${buffer.mkString("\n")}
|===========================
Expand Down

0 comments on commit 7f395b7

Please sign in to comment.