Skip to content

Commit

Permalink
[SPARK-20126][SQL] Remove HiveSessionState
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Commit ea36116 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <[email protected]>

Closes #17457 from hvanhovell/SPARK-20126.
  • Loading branch information
hvanhovell authored and cloud-fan committed Mar 28, 2017
1 parent 4fcc214 commit f82461f
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class AddJarCommand(path: String) extends RunnableCommand {
}

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.addJar(path)
sparkSession.sessionState.resourceLoader.addJar(path)
Seq(Row(0))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private[sql] class SessionState(
val optimizer: Optimizer,
val planner: SparkPlanner,
val streamingQueryManager: StreamingQueryManager,
val resourceLoader: SessionResourceLoader,
createQueryExecution: LogicalPlan => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState) {

Expand Down Expand Up @@ -106,27 +107,6 @@ private[sql] class SessionState(
def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}

/**
* Add a jar path to [[SparkContext]] and the classloader.
*
* Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
* to add the jar to its hive client for the current session. Hence, it still needs to be in
* [[SessionState]].
*/
def addJar(path: String): Unit = {
sparkContext.addJar(path)
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
sharedState.jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader)
}
}

private[sql] object SessionState {
Expand Down Expand Up @@ -160,15 +140,36 @@ class SessionStateBuilder(
* Session shared [[FunctionResourceLoader]].
*/
@InterfaceStability.Unstable
class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
override def loadResource(resource: FunctionResource): Unit = {
resource.resourceType match {
case JarResource => session.sessionState.addJar(resource.uri)
case JarResource => addJar(resource.uri)
case FileResource => session.sparkContext.addFile(resource.uri)
case ArchiveResource =>
throw new AnalysisException(
"Archive is not allowed to be loaded. If YARN mode is used, " +
"please use --archives options while calling spark-submit.")
}
}

/**
* Add a jar path to [[SparkContext]] and the classloader.
*
* Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
* to add the jar to its hive client for the current session. Hence, it still needs to be in
* [[SessionState]].
*/
def addJar(path: String): Unit = {
session.sparkContext.addJar(path)
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
session.sharedState.jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ abstract class BaseSessionStateBuilder(
*/
protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)

/**
* ResourceLoader that is used to load function resources and jars.
*/
protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session)

/**
* Catalog for managing table and database states. If there is a pre-existing catalog, the state
* of that catalog (temp tables & current database) will be copied into the new catalog.
Expand All @@ -123,7 +128,7 @@ abstract class BaseSessionStateBuilder(
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
new SessionFunctionResourceLoader(session))
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
Expand Down Expand Up @@ -251,6 +256,7 @@ abstract class BaseSessionStateBuilder(
optimizer,
planner,
streamingQueryManager,
resourceLoader,
createQueryExecution,
createClone)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.PrintStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.util.Utils

/** A singleton object for the master program. The slaves should not access this. */
Expand All @@ -49,10 +49,12 @@ private[hive] object SparkSQLEnv extends Logging {
sparkContext = sparkSession.sparkContext
sqlContext = sparkSession.sqlContext

val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
val metadataHive = sparkSession
.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
.client.newSession()
metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}

/**
Expand All @@ -49,8 +49,8 @@ private[thriftserver] class SparkSQLOperationManager()
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val runInBackground = async && sessionState.hiveThriftServerAsync
val conf = sqlContext.sessionState.conf
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalLocale = Locale.getDefault
private val originalColumnBatchSize = TestHive.conf.columnBatchSize
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc
private val originalConvertMetastoreOrc = TestHive.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
new HiveContext(sparkSession.newSession())
}

protected[sql] override def sessionState: HiveSessionState = {
sparkSession.sessionState.asInstanceOf[HiveSessionState]
}

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
// these are def_s and not val/lazy val since the latter would introduce circular references
private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
private def sessionState = sparkSession.sessionState
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
import HiveMetastoreCatalog._

Expand Down Expand Up @@ -281,12 +281,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
object ParquetConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
sessionState.convertMetastoreParquet
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}

private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[ParquetFileFormat]
val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
val mergeSchema = sessionState.conf.getConf(
HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)

convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
Expand Down Expand Up @@ -316,7 +317,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
object OrcConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
sessionState.convertMetastoreOrc
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
Expand Down
144 changes: 23 additions & 121 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,121 +17,24 @@

package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
import org.apache.spark.sql.streaming.StreamingQueryManager

import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}

/**
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
*
* @param sparkContext The [[SparkContext]].
* @param sharedState The shared state.
* @param conf SQL-specific key-value configurations.
* @param experimentalMethods The experimental methods.
* @param functionRegistry Internal catalog for managing functions registered by the user.
* @param catalog Internal catalog for managing table and database states that uses Hive client for
* interacting with the metastore.
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
* @param optimizer Logical query plan optimizer.
* @param planner Planner that converts optimized logical plans to physical plans and that takes
* Hive-specific strategies into account.
* @param streamingQueryManager Interface to start and stop streaming queries.
* @param createQueryExecution Function used to create QueryExecution objects.
* @param createClone Function used to create clones of the session state.
* @param metadataHive The Hive metadata client.
* Entry object for creating a Hive aware [[SessionState]].
*/
private[hive] class HiveSessionState(
sparkContext: SparkContext,
sharedState: SharedState,
conf: SQLConf,
experimentalMethods: ExperimentalMethods,
functionRegistry: FunctionRegistry,
override val catalog: HiveSessionCatalog,
sqlParser: ParserInterface,
analyzer: Analyzer,
optimizer: Optimizer,
planner: SparkPlanner,
streamingQueryManager: StreamingQueryManager,
createQueryExecution: LogicalPlan => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState,
val metadataHive: HiveClient)
extends SessionState(
sparkContext,
sharedState,
conf,
experimentalMethods,
functionRegistry,
catalog,
sqlParser,
analyzer,
optimizer,
planner,
streamingQueryManager,
createQueryExecution,
createClone) {

// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
// ------------------------------------------------------

override def addJar(path: String): Unit = {
metadataHive.addJar(path)
super.addJar(path)
}

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe.
*/
def convertMetastoreParquet: Boolean = {
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}

/**
* When true, also tries to merge possibly different but compatible Parquet schemas in different
* Parquet data files.
*
* This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
*/
def convertMetastoreParquetWithSchemaMerging: Boolean = {
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
}

/**
* When true, enables an experimental feature where metastore tables that use the Orc SerDe
* are automatically converted to use the Spark SQL ORC table scan, instead of the Hive
* SerDe.
*/
def convertMetastoreOrc: Boolean = {
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

/**
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
*/
def hiveThriftServerAsync: Boolean = {
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
}

private[hive] object HiveSessionState {
/**
* Create a new [[HiveSessionState]] for the given session.
* Create a new Hive aware [[SessionState]]. for the given session.
*/
def apply(session: SparkSession): HiveSessionState = {
def apply(session: SparkSession): SessionState = {
new HiveSessionStateBuilder(session).build()
}
}
Expand All @@ -147,6 +50,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
private def externalCatalog: HiveExternalCatalog =
session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]

/**
* Create a Hive aware resource loader.
*/
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
val client: HiveClient = externalCatalog.client.newSession()
new HiveSessionResourceLoader(session, client)
}

/**
* Create a [[HiveSessionCatalog]].
*/
Expand All @@ -159,7 +70,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
new SessionFunctionResourceLoader(session))
resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
Expand Down Expand Up @@ -217,23 +128,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
}

override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
}

override def build(): HiveSessionState = {
val metadataHive: HiveClient = externalCatalog.client.newSession()
new HiveSessionState(
session.sparkContext,
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
catalog,
sqlParser,
analyzer,
optimizer,
planner,
streamingQueryManager,
createQueryExecution,
createClone,
metadataHive)
class HiveSessionResourceLoader(
session: SparkSession,
client: HiveClient)
extends SessionResourceLoader(session) {
override def addJar(path: String): Unit = {
client.addJar(path)
super.addJar(path)
}
}
Loading

0 comments on commit f82461f

Please sign in to comment.