Skip to content

Commit

Permalink
Correctly set the class loader in the conf of the state in client wra…
Browse files Browse the repository at this point in the history
…pper.
  • Loading branch information
yhuai committed Jun 18, 2015
1 parent b3378fe commit 695cd2d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
new ClientWrapper(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
config = newTemporaryConfiguration())
config = newTemporaryConfiguration(),
initClassLoader = Utils.getContextOrSparkClassLoader)
}
SessionState.setCurrentSessionState(executionHive.state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ import org.apache.spark.sql.execution.QueryExecutionException
* @param version the version of hive used when pick function calls that are not compatible.
* @param config a collection of configuration options that will be added to the hive conf before
* opening the hive client.
* @param initClassLoader the classloader used when creating the `state` field of this ClientWrapper.
*/
private[hive] class ClientWrapper(
version: HiveVersion,
config: Map[String, String])
config: Map[String, String],
initClassLoader: ClassLoader)
extends ClientInterface
with Logging {

Expand Down Expand Up @@ -98,11 +100,18 @@ private[hive] class ClientWrapper(
// Create an internal session state for this ClientWrapper.
val state = {
val original = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)
val ret = try {
val oldState = SessionState.get()
if (oldState == null) {
val initialConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
initialConf.setClassLoader(initClassLoader)
config.foreach { case (k, v) =>
logDebug(s"Hive Config: $k=$v")
initialConf.set(k, v)
Expand All @@ -125,20 +134,17 @@ private[hive] class ClientWrapper(
def conf: HiveConf = SessionState.get().getConf

// TODO: should be a def?s
// When we create this val client, the HiveConf of it (conf) is the one associated with state.
private val client = Hive.get(conf)

/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
private def withHiveState[A](f: => A): A = synchronized {
val original = Thread.currentThread().getContextClassLoader
// This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not
// internally override the context class loader of the current thread with the class loader
// associated with the HiveConf in `state`.
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
// Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated
// setCurrentSessionState will use the classLoader associated
// with the HiveConf in `state` to override the context class loader of the current
// thread.
shim.setCurrentSessionState(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ import org.apache.hadoop.hive.ql.session.SessionState
*/
private[client] sealed abstract class Shim {

/**
* Set the current SessionState to the given SessionState. Also, set the context classloader of
* the current thread to the one set in the HiveConf of this given `state`.
* @param state
*/
def setCurrentSessionState(state: SessionState): Unit

/**
Expand Down Expand Up @@ -159,7 +164,15 @@ private[client] class Shim_v0_12 extends Shim {
JBoolean.TYPE,
JBoolean.TYPE)

override def setCurrentSessionState(state: SessionState): Unit = startMethod.invoke(null, state)
override def setCurrentSessionState(state: SessionState): Unit = {
// Starting from Hive 0.13, setCurrentSessionState will internally override
// the context class loader of the current thread by the class loader set in
// the conf of the SessionState. So, for this Hive 0.12 shim, we add the same
// behavior. So, shim.setCurrentSessionState of all Hive versions have the
// consistent behavior.
Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader)
startMethod.invoke(null, state)
}

override def getDataLocation(table: Table): Option[String] =
Option(getDataLocationMethod.invoke(table)).map(_.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ private[hive] object IsolatedClientLoader {
* @param config A set of options that will be added to the HiveConf of the constructed client.
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
* true unless loading the version of hive that is on Sparks classloader.
* @param rootClassLoader The system root classloader.
* @param baseClassLoader The spark classloader that is used to load shared classes. Must not know
* about Hive classes.
* @param rootClassLoader The system root classloader. Must not know about Hive classes.
* @param baseClassLoader The spark classloader that is used to load shared classes.
*/
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
Expand All @@ -110,8 +109,8 @@ private[hive] class IsolatedClientLoader(
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {

// Check to make sure that the base classloader does not know about Hive.
assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure)
// Check to make sure that the root classloader does not know about Hive.
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)

/** All jars used by the hive specific classloader. */
protected def allJars = execJars.toArray
Expand Down Expand Up @@ -145,13 +144,15 @@ private[hive] class IsolatedClientLoader(
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name) && isolationOn) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader.
logDebug(s"shared class: $name")
baseClassLoader.loadClass(name)
}
Expand All @@ -167,7 +168,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[ClientWrapper].getName)
.getConstructors.head
.newInstance(version, config)
.newInstance(version, config, classLoader)
.asInstanceOf[ClientInterface]
} catch {
case e: InvocationTargetException =>
Expand Down

0 comments on commit 695cd2d

Please sign in to comment.