Skip to content

Commit

Permalink
Fixes test suites in hive-thriftserver
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Oct 10, 2014
1 parent 4e9b551 commit 7fd6757
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand Down Expand Up @@ -77,7 +77,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL CLI process exit value: $exitValue")
foundAllExpectedAnswers.tryFailure(
new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
}

try {
Expand All @@ -98,6 +99,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|End CliSuite failure output
|===========================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
Expand All @@ -120,7 +122,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
-> "Time taken: ",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test"
"DROP TABLE hive_test;"
-> "Time taken: "
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io.File
import java.net.ServerSocket
import java.sql.{DriverManager, Statement}
import java.util.concurrent.TimeoutException

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.scalatest.FunSuite
Expand All @@ -41,25 +41,28 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

private val listeningHost = "localhost"
private val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

private val warehousePath = getTempFilePath("warehouse")
private val metastorePath = getTempFilePath("metastore")
private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"

def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) {
val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val verbose = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).isDefined

def startThriftServerWithin(timeout: FiniteDuration = 10.seconds)(f: Statement => Unit) {
Thread.sleep(5000)

val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
val warehousePath = getTempFilePath("warehouse")
val metastorePath = getTempFilePath("metastore")
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
val listeningHost = "localhost"
val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

val command =
s"""$serverScript
s"""$startScript
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
Expand All @@ -68,29 +71,41 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
""".stripMargin.split("\\s+").toSeq

val serverStarted = Promise[Unit]()
val serverRunning = Promise[Unit]()
val buffer = new ArrayBuffer[String]()

def captureOutput(source: String)(line: String) {
buffer += s"$source> $line"
val LOGGING_MARK =
s"starting ${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to "
var logTailingProcess: Process = null
var logFilePath: String = null

def captureLogOutput(line: String): Unit = {
logInfo(s"server log | $line")
buffer += line
if (line.contains("ThriftBinaryCLIService listening on")) {
serverStarted.success(())
serverRunning.success(())
}
}

val process = Process(command).run(
ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
def captureThriftServerOutput(source: String)(line: String): Unit = {
logInfo(s"server $source | $line")
if (line.startsWith(LOGGING_MARK)) {
logFilePath = line.drop(LOGGING_MARK.length).trim
// Ensure that the log file is created so that the `tail' command won't fail
Try(new File(logFilePath).createNewFile())
logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath")
.run(ProcessLogger(captureLogOutput, _ => ()))
}
}

Process(command).run(ProcessLogger(
captureThriftServerOutput("stdout"),
captureThriftServerOutput("stderr")))

val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
val user = System.getProperty("user.name")

try {
Await.result(serverStarted.future, timeout)
Await.result(serverRunning.future, timeout)

val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()
Expand Down Expand Up @@ -122,10 +137,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|End HiveThriftServer2Suite failure output
|=========================================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
process.destroy()
Process(stopScript).run().exitValue()
// The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
Thread.sleep(3.seconds.toMillis)
Option(logTailingProcess).map(_.destroy())
Option(logFilePath).map(new File(_).delete())
}
}

Expand Down

0 comments on commit 7fd6757

Please sign in to comment.