Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3791][SQL] Provides Spark version and Hive version in HiveThriftServer2 #2843

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.util
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.jar.Attributes.Name
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import java.util.jar.{Manifest => JarManifest}

import scala.collection.JavaConversions._
import scala.collection.Map
Expand Down Expand Up @@ -1754,6 +1756,12 @@ private[spark] object Utils extends Logging {
s"$libraryPathEnvName=$libraryPath$ampersand"
}

lazy val sparkVersion =
SparkContext.jarOfObject(this).map { path =>
val manifestUrl = new URL(s"jar:file:$path!/META-INF/MANIFEST.MF")
val manifest = new JarManifest(manifestUrl.openStream())
manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
}.getOrElse("Unknown")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,50 +53,35 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut
extends LeafNode with Command with Logging {

override protected lazy val sideEffectResult: Seq[Row] = kv match {
// Set value for the key.
case Some((key, Some(value))) =>
if (key == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
// Configures the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
} else {
context.setConf(key, value)
Seq(Row(s"$key=$value"))
}

// Query the value bound to the key.
context.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))

// Configures a single property.
case Some((key, Some(value))) =>
context.setConf(key, value)
Seq(Row(s"$key=$value"))

// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
// while "SET -v" returns all properties.)
case Some(("-v", None)) | None =>
context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq

// Queries the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))

// Queries a single property.
case Some((key, None)) =>
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
// Should remove this once we get the ODBC driver updated.
if (key == "-v") {
val hiveJars = Seq(
"hive-exec-0.12.0.jar",
"hive-service-0.12.0.jar",
"hive-common-0.12.0.jar",
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")

context.getAllConfs.map { case (k, v) =>
Row(s"$k=$v")
}.toSeq ++ Seq(
Row("system:java.class.path=" + hiveJars),
Row("system:sun.java.command=shark.SharkServer2"))
} else {
if (key == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
} else {
Seq(Row(s"$key=${context.getConf(key, "<undefined>")}"))
}
}

// Query all key-value pairs that are set in the SQLConf of the context.
case _ =>
context.getAllConfs.map { case (k, v) =>
Row(s"$k=$v")
}.toSeq
Seq(Row(s"$key=${context.getConf(key, "<undefined>")}"))
}

override def otherCopyArgs = context :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.jar.Attributes.Name

import scala.collection.JavaConversions._

import java.io.IOException
Expand All @@ -29,11 +31,12 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli.CLIService
import org.apache.hive.service.cli._
import org.apache.hive.service.{AbstractService, Service, ServiceException}

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.util.Utils

private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
extends CLIService
Expand All @@ -60,6 +63,15 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)

initCompositeService(hiveConf)
}

override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL")
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(Utils.sparkVersion)
case _ => super.getInfo(sessionHandle, getInfoType)
}
}
}

private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.{HiveShim, HiveContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import scala.collection.JavaConversions._

/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
Expand All @@ -31,8 +32,10 @@ private[hive] object SparkSQLEnv extends Logging {

def init() {
if (hiveContext == null) {
sparkContext = new SparkContext(new SparkConf()
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
val sparkConf = new SparkConf()
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
.set("spark.sql.hive.version", HiveShim.version)
sparkContext = new SparkContext(sparkConf)

sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,95 @@ import scala.util.Try

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.apache.hive.service.auth.PlainSaslHelper
import org.apache.hive.service.cli.GetInfoType
import org.apache.hive.service.cli.thrift.TCLIService.Client
import org.apache.hive.service.cli.thrift._
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.scalatest.FunSuite

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.hive.HiveShim

/**
* Tests for the HiveThriftServer2 using JDBC.
*
* NOTE: SPARK_PREPEND_CLASSES is explicitly disabled in this test suite. Assembly jar must be
* rebuilt after changing HiveThriftServer2 related code.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requirement should be OK for Jenkins, since Jenkins always build the assembly jar before executing any test suites.

*/
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
def randomListeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
val port = randomListeningPort

startThriftServer(port, serverStartTimeout) {
val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/"
val user = System.getProperty("user.name")
val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()

try {
f(statement)
} finally {
statement.close()
connection.close()
}
}
}

def withCLIServiceClient(
serverStartTimeout: FiniteDuration = 1.minute)(
f: ThriftCLIServiceClient => Unit) {
val port = randomListeningPort

startThriftServer(port) {
// Transport creation logics below mimics HiveConnection.createBinaryTransport
val rawTransport = new TSocket("localhost", port)
val user = System.getProperty("user.name")
val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
val protocol = new TBinaryProtocol(transport)
val client = new ThriftCLIServiceClient(new Client(protocol))

transport.open()

try {
f(client)
} finally {
transport.close()
}
}
}

def startThriftServer(
port: Int,
serverStartTimeout: FiniteDuration = 1.minute)(
f: => Unit) {
val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)

val warehousePath = getTempFilePath("warehouse")
val metastorePath = getTempFilePath("metastore")
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
val listeningHost = "localhost"
val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

val command =
s"""$startScript
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$listeningHost
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"}
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port
""".stripMargin.split("\\s+").toSeq

val serverRunning = Promise[Unit]()
Expand All @@ -92,31 +145,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
}

// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
val env = Seq(
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
"SPARK_TESTING" -> "0",
// Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
// proper version information from the jar manifest.
"SPARK_PREPEND_CLASSES" -> "")

Process(command, None, env: _*).run(ProcessLogger(
captureThriftServerOutput("stdout"),
captureThriftServerOutput("stderr")))

val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
val user = System.getProperty("user.name")

try {
Await.result(serverRunning.future, timeout)

val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()

try {
f(statement)
} finally {
statement.close()
connection.close()
}
Await.result(serverRunning.future, serverStartTimeout)
f
} catch {
case cause: Exception =>
cause match {
case _: TimeoutException =>
logError(s"Failed to start Hive Thrift server within $timeout", cause)
logError(s"Failed to start Hive Thrift server within $serverStartTimeout", cause)
case _ =>
}
logError(
Expand All @@ -125,8 +172,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|HiveThriftServer2Suite failure output
|=====================================
|HiveThriftServer2 command line: ${command.mkString(" ")}
|JDBC URI: $jdbcUri
|User: $user
|Binding port: $port
|System user: ${System.getProperty("user.name")}
|
|${buffer.mkString("\n")}
|=========================================
Expand All @@ -146,7 +193,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}

test("Test JDBC query execution") {
startThriftServerWithin() { statement =>
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

Expand All @@ -168,7 +215,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}

test("SPARK-3004 regression: result set containing NULL") {
startThriftServerWithin() { statement =>
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource(
"data/files/small_kv_with_null.txt")
Expand All @@ -191,4 +238,33 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
assert(!resultSet.next())
}
}

test("GetInfo Thrift API") {
withCLIServiceClient() { client =>
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")

assertResult("Spark SQL", "Wrong GetInfo(CLI_DBMS_NAME) result") {
client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME).getStringValue
}

assertResult("Spark SQL", "Wrong GetInfo(CLI_SERVER_NAME) result") {
client.getInfo(sessionHandle, GetInfoType.CLI_SERVER_NAME).getStringValue
}

assertResult(true, "Spark version shouldn't be \"Unknown\"") {
val version = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_VER).getStringValue
logInfo(s"Spark version: $version")
version != "Unknown"
}
}
}

test("Checks Hive version") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
driver.close()
HiveShim.processResults(results)
case _ =>
sessionState.out.println(tokens(0) + " " + cmd_1)
if (sessionState.out != null) {
sessionState.out.println(tokens(0) + " " + cmd_1)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionState life cycle control is rather broken and error prone in current code base. Working on a separate PR to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is partly fixed in #2887.

Seq(proc.run(cmd_1).getResponseCode.toString)
}
} catch {
Expand Down
Loading