From f82461fc1197f6055d9cf972d82260b178e10a7c Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 28 Mar 2017 23:14:31 +0800 Subject: [PATCH] [SPARK-20126][SQL] Remove HiveSessionState ## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell Closes #17457 from hvanhovell/SPARK-20126. --- .../sql/execution/command/resources.scala | 2 +- .../spark/sql/internal/SessionState.scala | 47 +++--- .../sql/internal/sessionStateBuilders.scala | 8 +- .../sql/hive/thriftserver/SparkSQLEnv.scala | 12 +- .../server/SparkSQLOperationManager.scala | 6 +- .../execution/HiveCompatibilitySuite.scala | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 4 - .../spark/sql/hive/HiveMetastoreCatalog.scala | 9 +- .../spark/sql/hive/HiveSessionState.scala | 144 +++--------------- .../apache/spark/sql/hive/test/TestHive.scala | 23 ++- .../sql/hive/HiveMetastoreCatalogSuite.scala | 6 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 7 +- .../sql/hive/execution/HiveDDLSuite.scala | 6 +- .../apache/spark/sql/hive/parquetSuites.scala | 21 ++- 14 files changed, 104 insertions(+), 193 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala index 20b08946675d0..2e859cf1ef253 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -37,7 +37,7 @@ case class AddJarCommand(path: String) extends RunnableCommand { } override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.addJar(path) + sparkSession.sessionState.resourceLoader.addJar(path) Seq(Row(0)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b5b0bb0bfc401..c6241d923d7b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -63,6 +63,7 @@ private[sql] class SessionState( val optimizer: Optimizer, val planner: SparkPlanner, val streamingQueryManager: StreamingQueryManager, + val resourceLoader: SessionResourceLoader, createQueryExecution: LogicalPlan => QueryExecution, createClone: (SparkSession, SessionState) => SessionState) { @@ -106,27 +107,6 @@ private[sql] class SessionState( def refreshTable(tableName: String): Unit = { catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } - - /** - * Add a jar path to [[SparkContext]] and the classloader. - * - * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs - * to add the jar to its hive client for the current session. Hence, it still needs to be in - * [[SessionState]]. - */ - def addJar(path: String): Unit = { - sparkContext.addJar(path) - val uri = new Path(path).toUri - val jarURL = if (uri.getScheme == null) { - // `path` is a local file path without a URL scheme - new File(path).toURI.toURL - } else { - // `path` is a URL with a scheme - uri.toURL - } - sharedState.jarClassLoader.addURL(jarURL) - Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader) - } } private[sql] object SessionState { @@ -160,10 +140,10 @@ class SessionStateBuilder( * Session shared [[FunctionResourceLoader]]. */ @InterfaceStability.Unstable -class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader { +class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader { override def loadResource(resource: FunctionResource): Unit = { resource.resourceType match { - case JarResource => session.sessionState.addJar(resource.uri) + case JarResource => addJar(resource.uri) case FileResource => session.sparkContext.addFile(resource.uri) case ArchiveResource => throw new AnalysisException( @@ -171,4 +151,25 @@ class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResou "please use --archives options while calling spark-submit.") } } + + /** + * Add a jar path to [[SparkContext]] and the classloader. + * + * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs + * to add the jar to its hive client for the current session. Hence, it still needs to be in + * [[SessionState]]. + */ + def addJar(path: String): Unit = { + session.sparkContext.addJar(path) + val uri = new Path(path).toUri + val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL + } else { + // `path` is a URL with a scheme + uri.toURL + } + session.sharedState.jarClassLoader.addURL(jarURL) + Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala index 6b5559adb1db4..b8f645fdee85a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala @@ -109,6 +109,11 @@ abstract class BaseSessionStateBuilder( */ protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) + /** + * ResourceLoader that is used to load function resources and jars. + */ + protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session) + /** * Catalog for managing table and database states. If there is a pre-existing catalog, the state * of that catalog (temp tables & current database) will be copied into the new catalog. @@ -123,7 +128,7 @@ abstract class BaseSessionStateBuilder( conf, SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), sqlParser, - new SessionFunctionResourceLoader(session)) + resourceLoader) parentState.foreach(_.catalog.copyStateTo(catalog)) catalog } @@ -251,6 +256,7 @@ abstract class BaseSessionStateBuilder( optimizer, planner, streamingQueryManager, + resourceLoader, createQueryExecution, createClone) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index c0b299411e94a..01c4eb131a564 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -22,7 +22,7 @@ import java.io.PrintStream import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ @@ -49,10 +49,12 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = sparkSession.sparkContext sqlContext = sparkSession.sqlContext - val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) - sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) - sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) + val metadataHive = sparkSession + .sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + .client.newSession() + metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) + metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) + metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 49ab664009341..a0e5012633f5e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -26,7 +26,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveSessionState +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation} /** @@ -49,8 +49,8 @@ private[thriftserver] class SparkSQLOperationManager() val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val runInBackground = async && sessionState.hiveThriftServerAsync + val conf = sqlContext.sessionState.conf + val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index f78660f7c14b6..0a53aaca404e6 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -39,7 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalLocale = Locale.getDefault private val originalColumnBatchSize = TestHive.conf.columnBatchSize private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning - private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc + private val originalConvertMetastoreOrc = TestHive.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 5393c57c9a28f..02a5117f005e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -48,10 +48,6 @@ class HiveContext private[hive](_sparkSession: SparkSession) new HiveContext(sparkSession.newSession()) } - protected[sql] override def sessionState: HiveSessionState = { - sparkSession.sessionState.asInstanceOf[HiveSessionState] - } - /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2e060ab9f6801..305bd007c93f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.types._ */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { // these are def_s and not val/lazy val since the latter would introduce circular references - private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] + private def sessionState = sparkSession.sessionState private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache import HiveMetastoreCatalog._ @@ -281,12 +281,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log object ParquetConversions extends Rule[LogicalPlan] { private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = { relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && - sessionState.convertMetastoreParquet + sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) } private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = { val fileFormatClass = classOf[ParquetFileFormat] - val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging + val mergeSchema = sessionState.conf.getConf( + HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) convertToLogicalRelation(relation, options, fileFormatClass, "parquet") @@ -316,7 +317,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log object OrcConversions extends Rule[LogicalPlan] { private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = { relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && - sessionState.convertMetastoreOrc + sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 49ff8478f1ae2..f49e6bb418644 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -17,121 +17,24 @@ package org.apache.spark.sql.hive -import org.apache.spark.SparkContext import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner} +import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf} -import org.apache.spark.sql.streaming.StreamingQueryManager - +import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} /** - * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. - * - * @param sparkContext The [[SparkContext]]. - * @param sharedState The shared state. - * @param conf SQL-specific key-value configurations. - * @param experimentalMethods The experimental methods. - * @param functionRegistry Internal catalog for managing functions registered by the user. - * @param catalog Internal catalog for managing table and database states that uses Hive client for - * interacting with the metastore. - * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts. - * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. - * @param optimizer Logical query plan optimizer. - * @param planner Planner that converts optimized logical plans to physical plans and that takes - * Hive-specific strategies into account. - * @param streamingQueryManager Interface to start and stop streaming queries. - * @param createQueryExecution Function used to create QueryExecution objects. - * @param createClone Function used to create clones of the session state. - * @param metadataHive The Hive metadata client. + * Entry object for creating a Hive aware [[SessionState]]. */ -private[hive] class HiveSessionState( - sparkContext: SparkContext, - sharedState: SharedState, - conf: SQLConf, - experimentalMethods: ExperimentalMethods, - functionRegistry: FunctionRegistry, - override val catalog: HiveSessionCatalog, - sqlParser: ParserInterface, - analyzer: Analyzer, - optimizer: Optimizer, - planner: SparkPlanner, - streamingQueryManager: StreamingQueryManager, - createQueryExecution: LogicalPlan => QueryExecution, - createClone: (SparkSession, SessionState) => SessionState, - val metadataHive: HiveClient) - extends SessionState( - sparkContext, - sharedState, - conf, - experimentalMethods, - functionRegistry, - catalog, - sqlParser, - analyzer, - optimizer, - planner, - streamingQueryManager, - createQueryExecution, - createClone) { - - // ------------------------------------------------------ - // Helper methods, partially leftover from pre-2.0 days - // ------------------------------------------------------ - - override def addJar(path: String): Unit = { - metadataHive.addJar(path) - super.addJar(path) - } - - /** - * When true, enables an experimental feature where metastore tables that use the parquet SerDe - * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive - * SerDe. - */ - def convertMetastoreParquet: Boolean = { - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) - } - - /** - * When true, also tries to merge possibly different but compatible Parquet schemas in different - * Parquet data files. - * - * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true. - */ - def convertMetastoreParquetWithSchemaMerging: Boolean = { - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) - } - - /** - * When true, enables an experimental feature where metastore tables that use the Orc SerDe - * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive - * SerDe. - */ - def convertMetastoreOrc: Boolean = { - conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) - } - - /** - * When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool." - */ - def hiveThriftServerAsync: Boolean = { - conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) - } -} - private[hive] object HiveSessionState { /** - * Create a new [[HiveSessionState]] for the given session. + * Create a new Hive aware [[SessionState]]. for the given session. */ - def apply(session: SparkSession): HiveSessionState = { + def apply(session: SparkSession): SessionState = { new HiveSessionStateBuilder(session).build() } } @@ -147,6 +50,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session private def externalCatalog: HiveExternalCatalog = session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + /** * Create a [[HiveSessionCatalog]]. */ @@ -159,7 +70,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session conf, SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), sqlParser, - new SessionFunctionResourceLoader(session)) + resourceLoader) parentState.foreach(_.catalog.copyStateTo(catalog)) catalog } @@ -217,23 +128,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session } override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) +} - override def build(): HiveSessionState = { - val metadataHive: HiveClient = externalCatalog.client.newSession() - new HiveSessionState( - session.sparkContext, - session.sharedState, - conf, - experimentalMethods, - functionRegistry, - catalog, - sqlParser, - analyzer, - optimizer, - planner, - streamingQueryManager, - createQueryExecution, - createClone, - metadataHive) +class HiveSessionResourceLoader( + session: SparkSession, + client: HiveClient) + extends SessionResourceLoader(session) { + override def addJar(path: String): Unit = { + client.addJar(path) + super.addJar(path) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 32ca69605ef4d..0bcf219922764 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -34,7 +34,6 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand @@ -81,7 +80,7 @@ private[hive] class TestHiveSharedState( hiveClient: Option[HiveClient] = None) extends SharedState(sc) { - override lazy val externalCatalog: ExternalCatalog = { + override lazy val externalCatalog: TestHiveExternalCatalog = { new TestHiveExternalCatalog( sc.conf, sc.hadoopConfiguration, @@ -123,8 +122,6 @@ class TestHiveContext( new TestHiveContext(sparkSession.newSession()) } - override def sessionState: HiveSessionState = sparkSession.sessionState - def setCacheTables(c: Boolean): Unit = { sparkSession.setCacheTables(c) } @@ -155,7 +152,7 @@ class TestHiveContext( private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, @transient private val existingSharedState: Option[TestHiveSharedState], - @transient private val parentSessionState: Option[HiveSessionState], + @transient private val parentSessionState: Option[SessionState], private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => @@ -195,10 +192,12 @@ private[hive] class TestHiveSparkSession( } @transient - override lazy val sessionState: HiveSessionState = { + override lazy val sessionState: SessionState = { new TestHiveSessionStateBuilder(this, parentSessionState).build() } + lazy val metadataHive: HiveClient = sharedState.externalCatalog.client.newSession() + override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables) } @@ -492,7 +491,7 @@ private[hive] class TestHiveSparkSession( sessionState.catalog.clearTempTables() sessionState.catalog.tableRelationCache.invalidateAll() - sessionState.metadataHive.reset() + metadataHive.reset() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } @@ -509,14 +508,14 @@ private[hive] class TestHiveSparkSession( sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString) // It is important that we RESET first as broken hooks that might have been set could break // other sql exec here. - sessionState.metadataHive.runSqlHive("RESET") + metadataHive.runSqlHive("RESET") // For some reason, RESET does not reset the following variables... // https://issues.apache.org/jira/browse/HIVE-9004 - sessionState.metadataHive.runSqlHive("set hive.table.parameters.default=") - sessionState.metadataHive.runSqlHive("set datanucleus.cache.collections=true") - sessionState.metadataHive.runSqlHive("set datanucleus.cache.collections.lazy=true") + metadataHive.runSqlHive("set hive.table.parameters.default=") + metadataHive.runSqlHive("set datanucleus.cache.collections=true") + metadataHive.runSqlHive("set datanucleus.cache.collections.lazy=true") // Lots of tests fail if we do not change the partition whitelist from the default. - sessionState.metadataHive.runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*") + metadataHive.runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*") sessionState.catalog.setCurrentDatabase("default") } catch { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 079358b29a191..d8fd68b63d1eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -115,7 +115,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType)) checkAnswer(table("t"), testDF) - assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) } } @@ -147,7 +147,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType)) checkAnswer(table("t"), testDF) - assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) } } @@ -176,7 +176,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq(IntegerType, StringType)) checkAnswer(table("t"), Row(1, "val_1")) - assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index f02b7218d6eee..55e02acfa4ce3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -379,8 +379,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = - sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) + val expectedPath = sessionState.catalog.defaultTablePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf()) fs.delete(filesystemPath, true) @@ -486,7 +485,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("DROP TABLE savedJsonTable") intercept[AnalysisException] { read.json( - sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) + sessionState.catalog.defaultTablePath(TableIdentifier("savedJsonTable")).toString) } } @@ -756,7 +755,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv serde = None, compressed = false, properties = Map( - "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + "path" -> sessionState.catalog.defaultTablePath(TableIdentifier(tableName)).toString) ), properties = Map( DATASOURCE_PROVIDER -> "json", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 04bc79d430324..f0a995c274b64 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -128,11 +128,11 @@ class HiveDDLSuite dbPath: Option[String] = None): Boolean = { val expectedTablePath = if (dbPath.isEmpty) { - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) + hiveContext.sessionState.catalog.defaultTablePath(tableIdentifier) } else { - new Path(new Path(dbPath.get), tableIdentifier.table).toString + new Path(new Path(dbPath.get), tableIdentifier.table) } - val filesystemPath = new Path(expectedTablePath) + val filesystemPath = new Path(expectedTablePath.toString) val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf()) fs.exists(filesystemPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 81af24979d822..9fc2923bb6fd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.HiveTableScanExec @@ -448,10 +449,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } + private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = { + sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id) + } + test("Caching converted data source Parquet Relations") { def checkCached(tableIdentifier: TableIdentifier): Unit = { // Converted test_parquet should be cached. - sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match { + getCachedDataSourceTable(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case LogicalRelation(_: HadoopFsRelation, _, _) => // OK case other => @@ -479,14 +484,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default")) // First, make sure the converted test_parquet is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. sessionState.refreshTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -499,7 +504,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. sessionState.refreshTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. sql( @@ -517,7 +522,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default")) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -526,14 +531,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -549,7 +554,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) sessionState.refreshTable("test_parquet_partitioned_cache_test") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") }