From 695cd2d1e5a487c9ccf2789b32c01228fe97e059 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 18 Jun 2015 16:48:11 -0700 Subject: [PATCH] Correctly set the class loader in the conf of the state in client wrapper. --- .../apache/spark/sql/hive/HiveContext.scala | 3 ++- .../spark/sql/hive/client/ClientWrapper.scala | 20 ++++++++++++------- .../spark/sql/hive/client/HiveShim.scala | 15 +++++++++++++- .../hive/client/IsolatedClientLoader.scala | 13 ++++++------ 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c50835dd8f11d..40d1cad8b053e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -164,7 +164,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { logInfo(s"Initializing execution hive, version $hiveExecutionVersion") new ClientWrapper( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), - config = newTemporaryConfiguration()) + config = newTemporaryConfiguration(), + initClassLoader = Utils.getContextOrSparkClassLoader) } SessionState.setCurrentSessionState(executionHive.state) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 982ed63874a5f..1cc1ff93bc8e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -54,10 +54,12 @@ import org.apache.spark.sql.execution.QueryExecutionException * @param version the version of hive used when pick function calls that are not compatible. * @param config a collection of configuration options that will be added to the hive conf before * opening the hive client. + * @param initClassLoader the classloader used when creating the `state` field of this ClientWrapper. */ private[hive] class ClientWrapper( version: HiveVersion, - config: Map[String, String]) + config: Map[String, String], + initClassLoader: ClassLoader) extends ClientInterface with Logging { @@ -98,11 +100,18 @@ private[hive] class ClientWrapper( // Create an internal session state for this ClientWrapper. val state = { val original = Thread.currentThread().getContextClassLoader - Thread.currentThread().setContextClassLoader(getClass.getClassLoader) + // Switch to the initClassLoader. + Thread.currentThread().setContextClassLoader(initClassLoader) val ret = try { val oldState = SessionState.get() if (oldState == null) { val initialConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + initialConf.setClassLoader(initClassLoader) config.foreach { case (k, v) => logDebug(s"Hive Config: $k=$v") initialConf.set(k, v) @@ -125,6 +134,7 @@ private[hive] class ClientWrapper( def conf: HiveConf = SessionState.get().getConf // TODO: should be a def?s + // When we create this val client, the HiveConf of it (conf) is the one associated with state. private val client = Hive.get(conf) /** @@ -132,13 +142,9 @@ private[hive] class ClientWrapper( */ private def withHiveState[A](f: => A): A = synchronized { val original = Thread.currentThread().getContextClassLoader - // This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not - // internally override the context class loader of the current thread with the class loader - // associated with the HiveConf in `state`. - Thread.currentThread().setContextClassLoader(getClass.getClassLoader) // Set the thread local metastore client to the client associated with this ClientWrapper. Hive.set(client) - // Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated + // setCurrentSessionState will use the classLoader associated // with the HiveConf in `state` to override the context class loader of the current // thread. shim.setCurrentSessionState(state) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 40c167926c8d6..6ab208fb98a0f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -43,6 +43,11 @@ import org.apache.hadoop.hive.ql.session.SessionState */ private[client] sealed abstract class Shim { + /** + * Set the current SessionState to the given SessionState. Also, set the context classloader of + * the current thread to the one set in the HiveConf of this given `state`. + * @param state + */ def setCurrentSessionState(state: SessionState): Unit /** @@ -159,7 +164,15 @@ private[client] class Shim_v0_12 extends Shim { JBoolean.TYPE, JBoolean.TYPE) - override def setCurrentSessionState(state: SessionState): Unit = startMethod.invoke(null, state) + override def setCurrentSessionState(state: SessionState): Unit = { + // Starting from Hive 0.13, setCurrentSessionState will internally override + // the context class loader of the current thread by the class loader set in + // the conf of the SessionState. So, for this Hive 0.12 shim, we add the same + // behavior. So, shim.setCurrentSessionState of all Hive versions have the + // consistent behavior. + Thread.currentThread().setContextClassLoader(state.getConf.getClassLoader) + startMethod.invoke(null, state) + } override def getDataLocation(table: Table): Option[String] = Option(getDataLocationMethod.invoke(table)).map(_.toString()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 69cfc5c3c3216..0934ad5034671 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -95,9 +95,8 @@ private[hive] object IsolatedClientLoader { * @param config A set of options that will be added to the HiveConf of the constructed client. * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be * true unless loading the version of hive that is on Sparks classloader. - * @param rootClassLoader The system root classloader. - * @param baseClassLoader The spark classloader that is used to load shared classes. Must not know - * about Hive classes. + * @param rootClassLoader The system root classloader. Must not know about Hive classes. + * @param baseClassLoader The spark classloader that is used to load shared classes. */ private[hive] class IsolatedClientLoader( val version: HiveVersion, @@ -110,8 +109,8 @@ private[hive] class IsolatedClientLoader( val barrierPrefixes: Seq[String] = Seq.empty) extends Logging { - // Check to make sure that the base classloader does not know about Hive. - assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure) + // Check to make sure that the root classloader does not know about Hive. + assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) /** All jars used by the hive specific classloader. */ protected def allJars = execJars.toArray @@ -145,6 +144,7 @@ private[hive] class IsolatedClientLoader( def doLoadClass(name: String, resolve: Boolean): Class[_] = { val classFileName = name.replaceAll("\\.", "/") + ".class" if (isBarrierClass(name) && isolationOn) { + // For barrier classes, we construct a new copy of the class. val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") defineClass(name, bytes, 0, bytes.length) @@ -152,6 +152,7 @@ private[hive] class IsolatedClientLoader( logDebug(s"hive class: $name - ${getResource(classToPath(name))}") super.loadClass(name, resolve) } else { + // For shared classes, we delegate to baseClassLoader. logDebug(s"shared class: $name") baseClassLoader.loadClass(name) } @@ -167,7 +168,7 @@ private[hive] class IsolatedClientLoader( classLoader .loadClass(classOf[ClientWrapper].getName) .getConstructors.head - .newInstance(version, config) + .newInstance(version, config, classLoader) .asInstanceOf[ClientInterface] } catch { case e: InvocationTargetException =>