Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2 #2887

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}

Expand Down Expand Up @@ -51,24 +48,12 @@ object HiveThriftServer2 extends Logging {

def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")

if (!optionsProcessor.process(args)) {
System.exit(-1)
}

val ss = new SessionState(new HiveConf(classOf[SessionState]))

// Set all properties specified via command line.
val hiveConf: HiveConf = ss.getConf
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
}

SessionState.start(ss)

logInfo("Starting SparkContext")
SparkSQLEnv.init()
SessionState.start(ss)

Runtime.getRuntime.addShutdownHook(
new Thread() {
Expand All @@ -80,7 +65,7 @@ object HiveThriftServer2 extends Logging {

try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
server.init(hiveConf)
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.sql.hive.thriftserver

import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
import org.apache.spark.Logging
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import scala.collection.JavaConversions._

/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
Expand All @@ -37,14 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))

sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)

hiveContext = new HiveContext(sparkContext) {
@transient override lazy val sessionState = {
val state = SessionState.get()
setConf(state.getConf.getAllProperties)
state
if (log.isDebugEnabled) {
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
}
@transient override lazy val hiveconf = sessionState.getConf
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

val queries = Seq(
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
"CACHE TABLE test")
val queries =
s"""SET spark.sql.shuffle.partitions=3;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SET command is used as a regression test of SPARK-4037.

|CREATE TABLE test(key INT, val STRING);
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
|CACHE TABLE test;
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)

queries.foreach(statement.execute)

Expand Down
44 changes: 30 additions & 14 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}

/**
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by
* set() or a SET command inside sql() will be set in the SQLConf *as well as*
* in the HiveConf.
* SQLConf and HiveConf contracts:
*
* 1. reuse existing started SessionState if any
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
* set in the SQLConf *as well as* in the HiveConf.
*/
@transient lazy val hiveconf = new HiveConf(classOf[SessionState])
@transient protected[hive] lazy val sessionState = {
val ss = new SessionState(hiveconf)
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
SessionState.start(ss)
ss.err = new PrintStream(outputBuffer, true, "UTF-8")
ss.out = new PrintStream(outputBuffer, true, "UTF-8")

ss
}
@transient protected[hive] lazy val (hiveconf, sessionState) =
Option(SessionState.get())
.orElse {
val newState = new SessionState(new HiveConf(classOf[SessionState]))
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance
// returned by `SessionState.get()` must be the most recently started one.
SessionState.start(newState)
Some(newState)
}
.map { state =>
setConf(state.getConf.getAllProperties)
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
(state.getConf, state)
}
.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should SessionState.start(state) be invoked here? Otherwise, it relies on other code to initialize the state, and we lose the track whether the state is initialized, and other code may call SessionState.start() multiple times, resulting in unexpected behavior. Correct me if I am wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't call SessionState.start(...) here because before Hive 0.13.1 support is merged in, it's always called when necessary in HiveContext.runHive. Just noticed that this call was removed in the most recent master branch. I consider this change dangerous because the current SessionState may be switched at some other place and cause subtle errors.

However, add another SessionState.start(...) call here is harmless and does help to track when and whether the session state is properly initialized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we may call SessionState.start with the session state multiple times. In my point of view, we should check whether SessionState.get is null or not, and then decide whether we need to call SessionState.start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider initializing the session state in the constructor, maybe by forcing the realization of the lazy val. Matei just bumped into a problem where running "SHOW TABLES" as the first command null pointers. Lets make sure that this PR fixes those sorts of problems too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, all commands/DDLs that are directly delegated to Hive suffer this issue, and this PR fixes it.


override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
Expand Down Expand Up @@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)

// Makes sure the session represented by the `sessionState` field is activated. This implies
// Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
// session isolation under multi-user scenarios (i.e. HiveThriftServer2).
// TODO Fix session isolation
if (SessionState.get() != sessionState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case this condition will be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, Spark SQL CLI uses a global CliSessionState instance, which inherits from SessionState. Also, this can be useful to fix the session isolation problem HiveThriftServer2 currently suffers from.

SessionState.start(sessionState)
}

proc match {
case driver: Driver =>
val results = HiveShim.createDriverResultsArray
Expand Down