diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 38977ff162097..95fa8fe6380fc 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} +import scala.io.Source import scala.sys.process.{Process, ProcessLogger} import java.io.File @@ -30,19 +31,19 @@ import java.util.concurrent.TimeoutException import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.sql.catalyst.util._ /** * Tests for the HiveThriftServer2 using JDBC. */ -class HiveThriftServer2Suite extends FunSuite with Logging { +class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) private val listeningHost = "localhost" - private val listeningPort = { + private val listeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. val socket = new ServerSocket(0) @@ -54,10 +55,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { private val warehousePath = getTempFilePath("warehouse") private val metastorePath = getTempFilePath("metastore") private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" + val user = System.getProperty("user.name") - def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) { + override def beforeAll() = { + val timeout: FiniteDuration = 30.seconds val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) - val command = s"""$serverScript | --master local @@ -70,37 +73,42 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val serverStarted = Promise[Unit]() val buffer = new ArrayBuffer[String]() + val startString = + "starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to " + val maxTries = 30 def captureOutput(source: String)(line: String) { buffer += s"$source> $line" - if (line.contains("ThriftBinaryCLIService listening on")) { - serverStarted.success(()) + if (line.contains(startString)) { + val logFile = new File(line.substring(startString.length)) + var tryNum = 0 + // This is a hack to wait logFile ready + Thread.sleep(5000) + // logFile may have not finished, try every second + while (!logFile.exists() || (!fileToString(logFile).contains( + "ThriftBinaryCLIService listening on") && tryNum < maxTries)) { + tryNum = tryNum + 1 + Thread.sleep(1000) + } + println(fileToString(logFile)) + if (fileToString(logFile).contains("ThriftBinaryCLIService listening on")) { + serverStarted.success(()) + } else { + throw new TimeoutException() + } } } - val process = Process(command).run( ProcessLogger(captureOutput("stdout"), captureOutput("stderr"))) Future { val exitValue = process.exitValue() - logInfo(s"Spark SQL Thrift server process exit value: $exitValue") + serverStarted.tryFailure( + new SparkException(s"Spark SQL Thrift server process exit value: $exitValue")) } - val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/" - val user = System.getProperty("user.name") - try { Await.result(serverStarted.future, timeout) - - val connection = DriverManager.getConnection(jdbcUri, user, "") - val statement = connection.createStatement() - - try { - f(statement) - } finally { - statement.close() - connection.close() - } } catch { case cause: Exception => cause match { @@ -122,15 +130,45 @@ class HiveThriftServer2Suite extends FunSuite with Logging { |End HiveThriftServer2Suite failure output |========================================= """.stripMargin, cause) + throw cause } finally { - warehousePath.delete() - metastorePath.delete() process.destroy() } } + override def afterAll() { + warehousePath.delete() + metastorePath.delete() + stopThriftserver + } + + def stopThriftserver = { + val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) + val builder = new ProcessBuilder(stopScript) + val process = builder.start() + new Thread("read stderr") { + override def run() { + for (line <- Source.fromInputStream(process.getErrorStream).getLines()) { + println(line) + } + } + }.start() + new Thread("read stdout") { + override def run() { + for (line <- Source.fromInputStream(process.getInputStream).getLines()) { + println(line) + } + } + }.start() + val exitValue = process.waitFor() + logInfo(s"Stop Spark SQL Thrift server process exit value: $exitValue") + } + test("Test JDBC query execution") { - startThriftServerWithin() { statement => + val connection = DriverManager.getConnection(jdbcUri, user, "") + val statement = connection.createStatement() + + try { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") @@ -144,13 +182,20 @@ class HiveThriftServer2Suite extends FunSuite with Logging { assertResult(5, "Row count mismatch") { val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") resultSet.next() +// println(s"#######${resultSet.getInt(1)}") resultSet.getInt(1) } + } finally { + statement.close() + connection.close() } } test("SPARK-3004 regression: result set containing NULL") { - startThriftServerWithin() { statement => + val connection = DriverManager.getConnection(jdbcUri, user, "") + val statement = connection.createStatement() + + try { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource( "data/files/small_kv_with_null.txt") @@ -166,11 +211,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging { (0 until 5).foreach { _ => resultSet.next() +// println(s"#######${resultSet.getInt(1)}") assert(resultSet.getInt(1) === 0) assert(resultSet.wasNull()) } - assert(!resultSet.next()) + } finally { + statement.close() + connection.close() } } }