Skip to content

Commit

Permalink
[SPARK-4037][SQL] Removes the SessionState instance created in HiveTh…
Browse files Browse the repository at this point in the history
…riftServer2

`HiveThriftServer2` creates a global singleton `SessionState` instance and overrides `HiveContext` to inject the `SessionState` object. This messes up `SessionState` initialization and causes problems.

This PR replaces the global `SessionState` with `HiveContext.sessionState` to avoid the initialization conflict. Also `HiveContext` reuses existing started `SessionState` if any (this is required by `SparkSQLCLIDriver`, which uses specialized `CliSessionState`).

Author: Cheng Lian <[email protected]>

Closes apache#2887 from liancheng/spark-4037 and squashes the following commits:

8446675 [Cheng Lian] Removes redundant Driver initialization
a28fef5 [Cheng Lian] Avoid starting HiveContext.sessionState multiple times
49b1c5b [Cheng Lian] Reuses existing started SessionState if any
3cd6fab [Cheng Lian] Fixes SPARK-4037

Conflicts:
	sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
	sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
  • Loading branch information
liancheng committed Nov 6, 2014
1 parent c58c1bb commit faeca62
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}

Expand All @@ -38,24 +35,12 @@ private[hive] object HiveThriftServer2 extends Logging {

def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")

if (!optionsProcessor.process(args)) {
System.exit(-1)
}

val ss = new SessionState(new HiveConf(classOf[SessionState]))

// Set all properties specified via command line.
val hiveConf: HiveConf = ss.getConf
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
}

SessionState.start(ss)

logInfo("Starting SparkContext")
SparkSQLEnv.init()
SessionState.start(ss)

Runtime.getRuntime.addShutdownHook(
new Thread() {
Expand All @@ -67,7 +52,7 @@ private[hive] object HiveThriftServer2 extends Logging {

try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
server.init(hiveConf)
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.sql.hive.thriftserver

import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
import org.apache.spark.Logging
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import scala.collection.JavaConversions._

/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
Expand All @@ -37,10 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))

sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)

hiveContext = new HiveContext(sparkContext) {
@transient override lazy val sessionState = SessionState.get()
@transient override lazy val hiveconf = sessionState.getConf
if (log.isDebugEnabled) {
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

val queries = Seq(
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
"CACHE TABLE test")
val queries =
s"""SET spark.sql.shuffle.partitions=3;
|CREATE TABLE test(key INT, val STRING);
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
|CACHE TABLE test;
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)

queries.foreach(statement.execute)

Expand Down
40 changes: 30 additions & 10 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}

/**
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by
* set() or a SET command inside sql() will be set in the SQLConf *as well as*
* in the HiveConf.
* SQLConf and HiveConf contracts:
*
* 1. reuse existing started SessionState if any
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
* set in the SQLConf *as well as* in the HiveConf.
*/
@transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState])
@transient protected[hive] lazy val sessionState = {
val ss = new SessionState(hiveconf)
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
ss
}
@transient protected[hive] lazy val (hiveconf, sessionState) =
Option(SessionState.get())
.orElse {
val newState = new SessionState(new HiveConf(classOf[SessionState]))
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance
// returned by `SessionState.get()` must be the most recently started one.
SessionState.start(newState)
Some(newState)
}
.map { state =>
setConf(state.getConf.getAllProperties)
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
(state.getConf, state)
}
.get

sessionState.err = new PrintStream(outputBuffer, true, "UTF-8")
sessionState.out = new PrintStream(outputBuffer, true, "UTF-8")
Expand Down Expand Up @@ -290,6 +302,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

SessionState.start(sessionState)

// Makes sure the session represented by the `sessionState` field is activated. This implies
// Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
// session isolation under multi-user scenarios (i.e. HiveThriftServer2).
// TODO Fix session isolation
if (SessionState.get() != sessionState) {
SessionState.start(sessionState)
}

proc match {
case driver: Driver =>
driver.init()
Expand Down

0 comments on commit faeca62

Please sign in to comment.