-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2 #2887
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |
} | ||
|
||
/** | ||
* SQLConf and HiveConf contracts: when the hive session is first initialized, params in | ||
* HiveConf will get picked up by the SQLConf. Additionally, any properties set by | ||
* set() or a SET command inside sql() will be set in the SQLConf *as well as* | ||
* in the HiveConf. | ||
* SQLConf and HiveConf contracts: | ||
* | ||
* 1. reuse existing started SessionState if any | ||
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the | ||
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be | ||
* set in the SQLConf *as well as* in the HiveConf. | ||
*/ | ||
@transient lazy val hiveconf = new HiveConf(classOf[SessionState]) | ||
@transient protected[hive] lazy val sessionState = { | ||
val ss = new SessionState(hiveconf) | ||
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. | ||
SessionState.start(ss) | ||
ss.err = new PrintStream(outputBuffer, true, "UTF-8") | ||
ss.out = new PrintStream(outputBuffer, true, "UTF-8") | ||
|
||
ss | ||
} | ||
@transient protected[hive] lazy val (hiveconf, sessionState) = | ||
Option(SessionState.get()) | ||
.orElse { | ||
val newState = new SessionState(new HiveConf(classOf[SessionState])) | ||
// Only starts newly created `SessionState` instance. Any existing `SessionState` instance | ||
// returned by `SessionState.get()` must be the most recently started one. | ||
SessionState.start(newState) | ||
Some(newState) | ||
} | ||
.map { state => | ||
setConf(state.getConf.getAllProperties) | ||
if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") | ||
if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") | ||
(state.getConf, state) | ||
} | ||
.get | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should SessionState.start(state) be invoked here? Otherwise, it relies on other code to initialize the state, and we lose the track whether the state is initialized, and other code may call SessionState.start() multiple times, resulting in unexpected behavior. Correct me if I am wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't call However, add another There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, we may call SessionState.start with the session state multiple times. In my point of view, we should check whether SessionState.get is null or not, and then decide whether we need to call SessionState.start. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider initializing the session state in the constructor, maybe by forcing the realization of the lazy val. Matei just bumped into a problem where running "SHOW TABLES" as the first command null pointers. Lets make sure that this PR fixes those sorts of problems too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, all commands/DDLs that are directly delegated to Hive suffer this issue, and this PR fixes it. |
||
|
||
override def setConf(key: String, value: String): Unit = { | ||
super.setConf(key, value) | ||
|
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() | ||
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) | ||
|
||
// Makes sure the session represented by the `sessionState` field is activated. This implies | ||
// Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks | ||
// session isolation under multi-user scenarios (i.e. HiveThriftServer2). | ||
// TODO Fix session isolation | ||
if (SessionState.get() != sessionState) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case this condition will be true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, Spark SQL CLI uses a global |
||
SessionState.start(sessionState) | ||
} | ||
|
||
proc match { | ||
case driver: Driver => | ||
val results = HiveShim.createDriverResultsArray | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This SET command is used as a regression test of SPARK-4037.