Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11624][SPARK-11972][SQL]fix commands that need hive to exec #9589

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,11 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
if (cmd_lower.equals("quit") ||
cmd_lower.equals("exit") ||
tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_lower.equals("exit")) {
sessionState.close()
System.exit(0)
}
if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, Hive does the same thing.

cmd_trimmed.startsWith("!") ||
tokens(0).toLowerCase.equals("list") ||
isRemoteMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
-> "Error in query: Table not found: nonexistent_table;"
)
}

test("SPARK-11624 Spark SQL CLI should set sessionState only once") {
runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))(
"" -> "This is a test for Spark-11624")
}
}
6 changes: 6 additions & 0 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-cli</artifactId>
</dependency>
<!--
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import scala.collection.JavaConverters._
import scala.language.reflectiveCalls

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
import org.apache.hadoop.hive.ql.{metadata, Driver}
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{Logging, SparkConf, SparkException}
Expand Down Expand Up @@ -104,29 +104,39 @@ private[hive] class HiveClientImpl(
}

val ret = try {
val initialConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
initialConf.setClassLoader(initClassLoader)
config.foreach { case (k, v) =>
if (k.toLowerCase.contains("password")) {
logDebug(s"Hive Config: $k=xxx")
} else {
logDebug(s"Hive Config: $k=$v")
// originState will be created if not exists, will never be null
val originalState = SessionState.get()
if (originalState.isInstanceOf[CliSessionState]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you don't special case this? Why is this dependent on the type of the session state and not just on the fact that a session has already been started?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the SessionState.get() method would create a instance of SessionState if not exists.

// In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
// which contains information like configurations from command line. Later
// we call `SparkSQLEnv.init()` there, which would run into this part again.
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
originalState
} else {
val initialConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
initialConf.setClassLoader(initClassLoader)
config.foreach { case (k, v) =>
if (k.toLowerCase.contains("password")) {
logDebug(s"Hive Config: $k=xxx")
} else {
logDebug(s"Hive Config: $k=$v")
}
initialConf.set(k, v)
}
initialConf.set(k, v)
}
val state = new SessionState(initialConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
val state = new SessionState(initialConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
} finally {
Thread.currentThread().setContextClassLoader(original)
}
Expand Down