From ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 28 Mar 2017 10:07:24 +0800 Subject: [PATCH] [SPARK-20100][SQL] Refactor SessionState initialization ## What changes were proposed in this pull request? The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions. This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements: 1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive. 2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this. ## How was this patch tested? Existing tests. Author: Herman van Hovell Closes #17433 from hvanhovell/SPARK-20100. --- .../sql/catalyst/catalog/SessionCatalog.scala | 46 +-- .../sql/catalyst/optimizer/Optimizer.scala | 16 +- .../catalog/SessionCatalogSuite.scala | 22 +- .../spark/sql/execution/SparkOptimizer.scala | 12 +- .../spark/sql/execution/SparkPlanner.scala | 11 +- .../streaming/IncrementalExecution.scala | 23 +- .../spark/sql/internal/SessionState.scala | 180 +++-------- .../sql/internal/sessionStateBuilders.scala | 279 ++++++++++++++++++ .../spark/sql/test/TestSQLContext.scala | 23 +- .../spark/sql/hive/HiveSessionCatalog.scala | 76 +---- .../spark/sql/hive/HiveSessionState.scala | 259 +++++++--------- .../apache/spark/sql/hive/test/TestHive.scala | 60 ++-- .../sql/hive/HiveSessionCatalogSuite.scala | 112 ------- 13 files changed, 547 insertions(+), 572 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a469d12451643..72ab075408899 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -54,7 +54,8 @@ class SessionCatalog( functionRegistry: FunctionRegistry, conf: CatalystConf, hadoopConf: Configuration, - parser: ParserInterface) extends Logging { + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) extends Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -69,8 +70,8 @@ class SessionCatalog( functionRegistry, conf, new Configuration(), - CatalystSqlParser) - functionResourceLoader = DummyFunctionResourceLoader + CatalystSqlParser, + DummyFunctionResourceLoader) } // For testing only. @@ -90,9 +91,7 @@ class SessionCatalog( // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) - - @volatile var functionResourceLoader: FunctionResourceLoader = _ + protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE) /** * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), @@ -1059,9 +1058,6 @@ class SessionCatalog( * by a tuple (resource type, resource uri). */ def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { - if (functionResourceLoader == null) { - throw new IllegalStateException("functionResourceLoader has not yet been initialized") - } resources.foreach(functionResourceLoader.loadResource) } @@ -1259,28 +1255,16 @@ class SessionCatalog( } /** - * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and - * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. + * Copy the current state of the catalog to another catalog. + * + * This function is synchronized on this [[SessionCatalog]] (the source) to make sure the copied + * state is consistent. The target [[SessionCatalog]] is not synchronized, and should not be + * because the target [[SessionCatalog]] should not be published at this point. The caller must + * synchronize on the target if this assumption does not hold. */ - def newSessionCatalogWith( - conf: CatalystConf, - hadoopConf: Configuration, - functionRegistry: FunctionRegistry, - parser: ParserInterface): SessionCatalog = { - val catalog = new SessionCatalog( - externalCatalog, - globalTempViewManager, - functionRegistry, - conf, - hadoopConf, - parser) - - synchronized { - catalog.currentDb = currentDb - // copy over temporary tables - tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) - } - - catalog + private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized { + target.currentDb = currentDb + // copy over temporary tables + tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dbe3ded4bbf15..dbf479d215134 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -17,20 +17,14 @@ package org.apache.spark.sql.catalyst.optimizer -import scala.annotation.tailrec -import scala.collection.immutable.HashSet import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: - Batch("Operator Optimizations", fixedPoint, + Batch("Operator Optimizations", fixedPoint, Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin(conf), @@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) RemoveRedundantProject, SimplifyCreateStructOps, SimplifyCreateArrayOps, - SimplifyCreateMapOps) :: + SimplifyCreateMapOps) ++ + extendedOperatorOptimizationRules: _*) :: Batch("Check Cartesian Products", Once, CheckCartesianProducts(conf)) :: Batch("Join Reorder", Once, @@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) s.withNewPlan(newPlan) } } + + /** + * Override to provide additional rules for the operator optimization batch. + */ + def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index ca4ce1c11707a..56bca73a8857a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.catalog -import org.apache.hadoop.conf.Configuration - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -1331,17 +1329,15 @@ abstract class SessionCatalogSuite extends PlanTest { } } - test("clone SessionCatalog - temp views") { + test("copy SessionCatalog state - temp views") { withEmptyCatalog { original => val tempTable1 = Range(1, 10, 1, 10) original.createTempView("copytest1", tempTable1, overrideIfExists = false) // check if tables copied over - val clone = original.newSessionCatalogWith( - SimpleCatalystConf(caseSensitiveAnalysis = true), - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) + val clone = new SessionCatalog(original.externalCatalog) + original.copyStateTo(clone) + assert(original ne clone) assert(clone.getTempView("copytest1") == Some(tempTable1)) @@ -1355,7 +1351,7 @@ abstract class SessionCatalogSuite extends PlanTest { } } - test("clone SessionCatalog - current db") { + test("copy SessionCatalog state - current db") { withEmptyCatalog { original => val db1 = "db1" val db2 = "db2" @@ -1368,11 +1364,9 @@ abstract class SessionCatalogSuite extends PlanTest { original.setCurrentDatabase(db1) // check if current db copied over - val clone = original.newSessionCatalogWith( - SimpleCatalystConf(caseSensitiveAnalysis = true), - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) + val clone = new SessionCatalog(original.externalCatalog) + original.copyStateTo(clone) + assert(original ne clone) assert(clone.getCurrentDatabase == db1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 981728331d361..2cdfb7a7828c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -30,9 +30,17 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalog, conf) { - override def batches: Seq[Batch] = super.batches :+ + override def batches: Seq[Batch] = (super.batches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ - Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+ + Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++ + postHocOptimizationBatches :+ Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + + /** + * Optimization batches that are executed after the regular optimization batches, but before the + * batch executing the [[ExperimentalMethods]] optimizer rules. This hook can be used to add + * custom optimizer batches to the Spark optimizer. + */ + def postHocOptimizationBatches: Seq[Batch] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 678241656c011..6566502bd8a8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -27,13 +27,14 @@ import org.apache.spark.sql.internal.SQLConf class SparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, - val extraStrategies: Seq[Strategy]) + val experimentalMethods: ExperimentalMethods) extends SparkStrategies { def numPartitions: Int = conf.numShufflePartitions def strategies: Seq[Strategy] = - extraStrategies ++ ( + experimentalMethods.extraStrategies ++ + extraPlanningStrategies ++ ( FileSourceStrategy :: DataSourceStrategy :: SpecialLimits :: @@ -42,6 +43,12 @@ class SparkPlanner( InMemoryScans :: BasicOperators :: Nil) + /** + * Override to add extra planning strategies to the planner. These strategies are tried after + * the strategies defined in [[ExperimentalMethods]], and before the regular strategies. + */ + def extraPlanningStrategies: Seq[Strategy] = Nil + override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { plan.collect { case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 0f0e4a91f8cc7..622e049630db2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Literal} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} @@ -40,20 +40,17 @@ class IncrementalExecution( offsetSeqMetadata: OffsetSeqMetadata) extends QueryExecution(sparkSession, logicalPlan) with Logging { - // TODO: make this always part of planning. - val streamingExtraStrategies = - sparkSession.sessionState.planner.StatefulAggregationStrategy +: - sparkSession.sessionState.planner.FlatMapGroupsWithStateStrategy +: - sparkSession.sessionState.planner.StreamingRelationStrategy +: - sparkSession.sessionState.planner.StreamingDeduplicationStrategy +: - sparkSession.sessionState.experimentalMethods.extraStrategies - // Modified planner with stateful operations. - override def planner: SparkPlanner = - new SparkPlanner( + override val planner: SparkPlanner = new SparkPlanner( sparkSession.sparkContext, sparkSession.sessionState.conf, - streamingExtraStrategies) + sparkSession.sessionState.experimentalMethods) { + override def extraPlanningStrategies: Seq[Strategy] = + StatefulAggregationStrategy :: + FlatMapGroupsWithStateStrategy :: + StreamingRelationStrategy :: + StreamingDeduplicationStrategy :: Nil + } /** * See [SPARK-18339] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index ce80604bd3657..b5b0bb0bfc401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -22,22 +22,21 @@ import java.io.File import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.util.ExecutionListenerManager - /** * A class that holds all session-specific state in a given [[SparkSession]]. + * * @param sparkContext The [[SparkContext]]. * @param sharedState The shared state. * @param conf SQL-specific key-value configurations. @@ -46,9 +45,11 @@ import org.apache.spark.sql.util.ExecutionListenerManager * @param catalog Internal catalog for managing table and database states. * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts. * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. - * @param streamingQueryManager Interface to start and stop - * [[org.apache.spark.sql.streaming.StreamingQuery]]s. - * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]] + * @param optimizer Logical query plan optimizer. + * @param planner Planner that converts optimized logical plans to physical plans + * @param streamingQueryManager Interface to start and stop streaming queries. + * @param createQueryExecution Function used to create QueryExecution objects. + * @param createClone Function used to create clones of the session state. */ private[sql] class SessionState( sparkContext: SparkContext, @@ -59,8 +60,11 @@ private[sql] class SessionState( val catalog: SessionCatalog, val sqlParser: ParserInterface, val analyzer: Analyzer, + val optimizer: Optimizer, + val planner: SparkPlanner, val streamingQueryManager: StreamingQueryManager, - val queryExecutionCreator: LogicalPlan => QueryExecution) { + createQueryExecution: LogicalPlan => QueryExecution, + createClone: (SparkSession, SessionState) => SessionState) { def newHadoopConf(): Configuration = SessionState.newHadoopConf( sparkContext.hadoopConfiguration, @@ -76,41 +80,12 @@ private[sql] class SessionState( hadoopConf } - /** - * A class for loading resources specified by a function. - */ - val functionResourceLoader: FunctionResourceLoader = { - new FunctionResourceLoader { - override def loadResource(resource: FunctionResource): Unit = { - resource.resourceType match { - case JarResource => addJar(resource.uri) - case FileResource => sparkContext.addFile(resource.uri) - case ArchiveResource => - throw new AnalysisException( - "Archive is not allowed to be loaded. If YARN mode is used, " + - "please use --archives options while calling spark-submit.") - } - } - } - } - /** * Interface exposed to the user for registering user-defined functions. * Note that the user-defined functions must be deterministic. */ val udf: UDFRegistration = new UDFRegistration(functionRegistry) - /** - * Logical query plan optimizer. - */ - val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) - - /** - * Planner that converts optimized logical plans to physical plans. - */ - def planner: SparkPlanner = - new SparkPlanner(sparkContext, conf, experimentalMethods.extraStrategies) - /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s * that listen for execution metrics. @@ -120,38 +95,13 @@ private[sql] class SessionState( /** * Get an identical copy of the `SessionState` and associate it with the given `SparkSession` */ - def clone(newSparkSession: SparkSession): SessionState = { - val sparkContext = newSparkSession.sparkContext - val confCopy = conf.clone() - val functionRegistryCopy = functionRegistry.clone() - val sqlParser: ParserInterface = new SparkSqlParser(confCopy) - val catalogCopy = catalog.newSessionCatalogWith( - confCopy, - SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy), - functionRegistryCopy, - sqlParser) - val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan) - - SessionState.mergeSparkConf(confCopy, sparkContext.getConf) - - new SessionState( - sparkContext, - newSparkSession.sharedState, - confCopy, - experimentalMethods.clone(), - functionRegistryCopy, - catalogCopy, - sqlParser, - SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy), - new StreamingQueryManager(newSparkSession), - queryExecutionCreator) - } + def clone(newSparkSession: SparkSession): SessionState = createClone(newSparkSession, this) // ------------------------------------------------------ // Helper methods, partially leftover from pre-2.0 days // ------------------------------------------------------ - def executePlan(plan: LogicalPlan): QueryExecution = queryExecutionCreator(plan) + def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan) def refreshTable(tableName: String): Unit = { catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) @@ -179,53 +129,12 @@ private[sql] class SessionState( } } - private[sql] object SessionState { - - def apply(sparkSession: SparkSession): SessionState = { - apply(sparkSession, new SQLConf) - } - - def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = { - val sparkContext = sparkSession.sparkContext - - // Automatically extract all entries and put them in our SQLConf - mergeSparkConf(sqlConf, sparkContext.getConf) - - val functionRegistry = FunctionRegistry.builtin.clone() - - val sqlParser: ParserInterface = new SparkSqlParser(sqlConf) - - val catalog = new SessionCatalog( - sparkSession.sharedState.externalCatalog, - sparkSession.sharedState.globalTempViewManager, - functionRegistry, - sqlConf, - newHadoopConf(sparkContext.hadoopConfiguration, sqlConf), - sqlParser) - - val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf) - - val streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(sparkSession) - - val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(sparkSession, plan) - - val sessionState = new SessionState( - sparkContext, - sparkSession.sharedState, - sqlConf, - new ExperimentalMethods, - functionRegistry, - catalog, - sqlParser, - analyzer, - streamingQueryManager, - queryExecutionCreator) - // functionResourceLoader needs to access SessionState.addJar, so it cannot be created before - // creating SessionState. Setting `catalog.functionResourceLoader` here is safe since the caller - // cannot use SessionCatalog before we return SessionState. - catalog.functionResourceLoader = sessionState.functionResourceLoader - sessionState + /** + * Create a new [[SessionState]] for the given session. + */ + def apply(session: SparkSession): SessionState = { + new SessionStateBuilder(session).build() } def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = { @@ -233,34 +142,33 @@ private[sql] object SessionState { sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) newHadoopConf.set(k, v) } newHadoopConf } +} - /** - * Create an logical query plan `Analyzer` with rules specific to a non-Hive `SessionState`. - */ - private def createAnalyzer( - sparkSession: SparkSession, - catalog: SessionCatalog, - sqlConf: SQLConf): Analyzer = { - new Analyzer(catalog, sqlConf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new FindDataSourceTable(sparkSession) :: - new ResolveSQLOnFile(sparkSession) :: Nil - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - PreprocessTableCreation(sparkSession) :: - PreprocessTableInsertion(sqlConf) :: - DataSourceAnalysis(sqlConf) :: Nil - - override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck) - } - } +/** + * Concrete implementation of a [[SessionStateBuilder]]. + */ +@Experimental +@InterfaceStability.Unstable +class SessionStateBuilder( + session: SparkSession, + parentState: Option[SessionState] = None) + extends BaseSessionStateBuilder(session, parentState) { + override protected def newBuilder: NewBuilder = new SessionStateBuilder(_, _) +} - /** - * Extract entries from `SparkConf` and put them in the `SQLConf` - */ - def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = { - sparkConf.getAll.foreach { case (k, v) => - sqlConf.setConfString(k, v) +/** + * Session shared [[FunctionResourceLoader]]. + */ +@InterfaceStability.Unstable +class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader { + override def loadResource(resource: FunctionResource): Unit = { + resource.resourceType match { + case JarResource => session.sessionState.addJar(resource.uri) + case FileResource => session.sparkContext.addFile(resource.uri) + case ArchiveResource => + throw new AnalysisException( + "Archive is not allowed to be loaded. If YARN mode is used, " + + "please use --archives options while calling spark-submit.") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala new file mode 100644 index 0000000000000..6b5559adb1db4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.internal + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.streaming.StreamingQueryManager + +/** + * Builder class that coordinates construction of a new [[SessionState]]. + * + * The builder explicitly defines all components needed by the session state, and creates a session + * state when `build` is called. Components should only be initialized once. This is not a problem + * for most components as they are only used in the `build` function. However some components + * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are as dependencies + * for other components and are shared as a result. These components are defined as lazy vals to + * make sure the component is created only once. + * + * A developer can modify the builder by providing custom versions of components, or by using the + * hooks provided for the analyzer, optimizer & planner. There are some dependencies between the + * components (they are documented per dependency), a developer should respect these when making + * modifications in order to prevent initialization problems. + * + * A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The new session + * state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods` + * and `catalog` fields. Note that the state is cloned when `build` is called, and not before. + */ +@Experimental +@InterfaceStability.Unstable +abstract class BaseSessionStateBuilder( + val session: SparkSession, + val parentState: Option[SessionState] = None) { + type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder + + /** + * Function that produces a new instance of the SessionStateBuilder. This is used by the + * [[SessionState]]'s clone functionality. Make sure to override this when implementing your own + * [[SessionStateBuilder]]. + */ + protected def newBuilder: NewBuilder + + /** + * Extract entries from `SparkConf` and put them in the `SQLConf` + */ + protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = { + sparkConf.getAll.foreach { case (k, v) => + sqlConf.setConfString(k, v) + } + } + + /** + * SQL-specific key-value configurations. + * + * These either get cloned from a pre-existing instance or newly created. The conf is always + * merged with its [[SparkConf]]. + */ + protected lazy val conf: SQLConf = { + val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf) + mergeSparkConf(conf, session.sparkContext.conf) + conf + } + + /** + * Internal catalog managing functions registered by the user. + * + * This either gets cloned from a pre-existing version or cloned from the built-in registry. + */ + protected lazy val functionRegistry: FunctionRegistry = { + parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone() + } + + /** + * Experimental methods that can be used to define custom optimization rules and custom planning + * strategies. + * + * This either gets cloned from a pre-existing version or newly created. + */ + protected lazy val experimentalMethods: ExperimentalMethods = { + parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods) + } + + /** + * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. + * + * Note: this depends on the `conf` field. + */ + protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) + + /** + * Catalog for managing table and database states. If there is a pre-existing catalog, the state + * of that catalog (temp tables & current database) will be copied into the new catalog. + * + * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields. + */ + protected lazy val catalog: SessionCatalog = { + val catalog = new SessionCatalog( + session.sharedState.externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + new SessionFunctionResourceLoader(session)) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + /** + * Logical query plan analyzer for resolving unresolved attributes and relations. + * + * Note: this depends on the `conf` and `catalog` fields. + */ + protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + customResolutionRules + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + customPostHocResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck +: + HiveOnlyCheck +: + customCheckRules + } + + /** + * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating + * your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Custom post resolution rules to add to the Analyzer. Prefer overriding this instead of + * creating your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Custom check rules to add to the Analyzer. Prefer overriding this instead of creating + * your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil + + /** + * Logical query plan optimizer. + * + * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields. + */ + protected def optimizer: Optimizer = { + new SparkOptimizer(catalog, conf, experimentalMethods) { + override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = + super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules + } + } + + /** + * Custom operator optimization rules to add to the Optimizer. Prefer overriding this instead + * of creating your own Optimizer. + * + * Note that this may NOT depend on the `optimizer` function. + */ + protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + + /** + * Planner that converts optimized logical plans to physical plans. + * + * Note: this depends on the `conf` and `experimentalMethods` fields. + */ + protected def planner: SparkPlanner = { + new SparkPlanner(session.sparkContext, conf, experimentalMethods) { + override def extraPlanningStrategies: Seq[Strategy] = + super.extraPlanningStrategies ++ customPlanningStrategies + } + } + + /** + * Custom strategies to add to the planner. Prefer overriding this instead of creating + * your own Planner. + * + * Note that this may NOT depend on the `planner` function. + */ + protected def customPlanningStrategies: Seq[Strategy] = Nil + + /** + * Create a query execution object. + */ + protected def createQueryExecution: LogicalPlan => QueryExecution = { plan => + new QueryExecution(session, plan) + } + + /** + * Interface to start and stop streaming queries. + */ + protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session) + + /** + * Function used to make clones of the session state. + */ + protected def createClone: (SparkSession, SessionState) => SessionState = { + val createBuilder = newBuilder + (session, state) => createBuilder(session, Option(state)).build() + } + + /** + * Build the [[SessionState]]. + */ + def build(): SessionState = { + new SessionState( + session.sparkContext, + session.sharedState, + conf, + experimentalMethods, + functionRegistry, + catalog, + sqlParser, + analyzer, + optimizer, + planner, + streamingQueryManager, + createQueryExecution, + createClone) + } +} + +/** + * Helper class for using SessionStateBuilders during tests. + */ +private[sql] trait WithTestConf { self: BaseSessionStateBuilder => + def overrideConfs: Map[String, String] + + override protected lazy val conf: SQLConf = { + val conf = parentState.map(_.conf.clone()).getOrElse { + new SQLConf { + clear() + override def clear(): Unit = { + super.clear() + // Make sure we start with the default test configs even after clear + overrideConfs.foreach { case (key, value) => setConfString(key, value) } + } + } + } + mergeSparkConf(conf, session.sparkContext.conf) + conf + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 898a2fb4f329b..b01977a23890f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf, WithTestConf} /** * A special [[SparkSession]] prepared for testing. @@ -35,16 +35,9 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { } @transient - override lazy val sessionState: SessionState = SessionState( - this, - new SQLConf { - clear() - override def clear(): Unit = { - super.clear() - // Make sure we start with the default test configs even after clear - TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) } - } - }) + override lazy val sessionState: SessionState = { + new TestSQLSessionStateBuilder(this, None).build() + } // Needed for Java tests def loadTestData(): Unit = { @@ -67,3 +60,11 @@ private[sql] object TestSQLContext { // Fewer shuffle partitions to speed up testing. SQLConf.SHUFFLE_PARTITIONS.key -> "5") } + +private[sql] class TestSQLSessionStateBuilder( + session: SparkSession, + state: Option[SessionState]) + extends SessionStateBuilder(session, state) with WithTestConf { + override def overrideConfs: Map[String, String] = TestSQLContext.overrideConfs + override def newBuilder: NewBuilder = new TestSQLSessionStateBuilder(_, _) +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 6b7599e3d3401..2cc20a791d80c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} -import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} @@ -47,14 +47,16 @@ private[sql] class HiveSessionCatalog( functionRegistry: FunctionRegistry, conf: SQLConf, hadoopConf: Configuration, - parser: ParserInterface) + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) extends SessionCatalog( externalCatalog, globalTempViewManager, functionRegistry, conf, hadoopConf, - parser) { + parser, + functionResourceLoader) { // ---------------------------------------------------------------- // | Methods and fields for interacting with HiveMetastoreCatalog | @@ -69,47 +71,6 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.hiveDefaultTableFilePath(name) } - /** - * Create a new [[HiveSessionCatalog]] with the provided parameters. `externalCatalog` and - * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. - */ - def newSessionCatalogWith( - newSparkSession: SparkSession, - conf: SQLConf, - hadoopConf: Configuration, - functionRegistry: FunctionRegistry, - parser: ParserInterface): HiveSessionCatalog = { - val catalog = HiveSessionCatalog( - newSparkSession, - functionRegistry, - conf, - hadoopConf, - parser) - - synchronized { - catalog.currentDb = currentDb - // copy over temporary tables - tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) - } - - catalog - } - - /** - * The parent class [[SessionCatalog]] cannot access the [[SparkSession]] class, so we cannot add - * a [[SparkSession]] parameter to [[SessionCatalog.newSessionCatalogWith]]. However, - * [[HiveSessionCatalog]] requires a [[SparkSession]] parameter, so we can a new version of - * `newSessionCatalogWith` and disable this one. - * - * TODO Refactor HiveSessionCatalog to not use [[SparkSession]] directly. - */ - override def newSessionCatalogWith( - conf: CatalystConf, - hadoopConf: Configuration, - functionRegistry: FunctionRegistry, - parser: ParserInterface): HiveSessionCatalog = throw new UnsupportedOperationException( - "to clone HiveSessionCatalog, use the other clone method that also accepts a SparkSession") - // For testing only private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { val key = metastoreCatalog.getQualifiedTableName(table) @@ -250,28 +211,3 @@ private[sql] class HiveSessionCatalog( "histogram_numeric" ) } - -private[sql] object HiveSessionCatalog { - - def apply( - sparkSession: SparkSession, - functionRegistry: FunctionRegistry, - conf: SQLConf, - hadoopConf: Configuration, - parser: ParserInterface): HiveSessionCatalog = { - // Catalog for handling data source tables. TODO: This really doesn't belong here since it is - // essentially a cache for metastore tables. However, it relies on a lot of session-specific - // things so it would be a lot of work to split its functionality between HiveSessionCatalog - // and HiveCatalog. We should still do it at some point... - val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) - - new HiveSessionCatalog( - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], - sparkSession.sharedState.globalTempViewManager, - metastoreCatalog, - functionRegistry, - conf, - hadoopConf, - parser) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index cb8bcb8591bd6..49ff8478f1ae2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,20 +18,23 @@ package org.apache.spark.sql.hive import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf} import org.apache.spark.sql.streaming.StreamingQueryManager /** * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. + * * @param sparkContext The [[SparkContext]]. * @param sharedState The shared state. * @param conf SQL-specific key-value configurations. @@ -40,12 +43,14 @@ import org.apache.spark.sql.streaming.StreamingQueryManager * @param catalog Internal catalog for managing table and database states that uses Hive client for * interacting with the metastore. * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts. - * @param metadataHive The Hive metadata client. * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. - * @param streamingQueryManager Interface to start and stop - * [[org.apache.spark.sql.streaming.StreamingQuery]]s. - * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]] - * @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies + * @param optimizer Logical query plan optimizer. + * @param planner Planner that converts optimized logical plans to physical plans and that takes + * Hive-specific strategies into account. + * @param streamingQueryManager Interface to start and stop streaming queries. + * @param createQueryExecution Function used to create QueryExecution objects. + * @param createClone Function used to create clones of the session state. + * @param metadataHive The Hive metadata client. */ private[hive] class HiveSessionState( sparkContext: SparkContext, @@ -55,11 +60,13 @@ private[hive] class HiveSessionState( functionRegistry: FunctionRegistry, override val catalog: HiveSessionCatalog, sqlParser: ParserInterface, - val metadataHive: HiveClient, analyzer: Analyzer, + optimizer: Optimizer, + planner: SparkPlanner, streamingQueryManager: StreamingQueryManager, - queryExecutionCreator: LogicalPlan => QueryExecution, - val plannerCreator: () => SparkPlanner) + createQueryExecution: LogicalPlan => QueryExecution, + createClone: (SparkSession, SessionState) => SessionState, + val metadataHive: HiveClient) extends SessionState( sparkContext, sharedState, @@ -69,14 +76,11 @@ private[hive] class HiveSessionState( catalog, sqlParser, analyzer, + optimizer, + planner, streamingQueryManager, - queryExecutionCreator) { self => - - /** - * Planner that takes into account Hive-specific strategies. - */ - override def planner: SparkPlanner = plannerCreator() - + createQueryExecution, + createClone) { // ------------------------------------------------------ // Helper methods, partially leftover from pre-2.0 days @@ -121,150 +125,115 @@ private[hive] class HiveSessionState( def hiveThriftServerAsync: Boolean = { conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) } +} +private[hive] object HiveSessionState { /** - * Get an identical copy of the `HiveSessionState`. - * This should ideally reuse the `SessionState.clone` but cannot do so. - * Doing that will throw an exception when trying to clone the catalog. + * Create a new [[HiveSessionState]] for the given session. */ - override def clone(newSparkSession: SparkSession): HiveSessionState = { - val sparkContext = newSparkSession.sparkContext - val confCopy = conf.clone() - val functionRegistryCopy = functionRegistry.clone() - val experimentalMethodsCopy = experimentalMethods.clone() - val sqlParser: ParserInterface = new SparkSqlParser(confCopy) - val catalogCopy = catalog.newSessionCatalogWith( - newSparkSession, - confCopy, - SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy), - functionRegistryCopy, - sqlParser) - val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan) - - val hiveClient = - newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - .newSession() - - SessionState.mergeSparkConf(confCopy, sparkContext.getConf) - - new HiveSessionState( - sparkContext, - newSparkSession.sharedState, - confCopy, - experimentalMethodsCopy, - functionRegistryCopy, - catalogCopy, - sqlParser, - hiveClient, - HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy), - new StreamingQueryManager(newSparkSession), - queryExecutionCreator, - HiveSessionState.createPlannerCreator( - newSparkSession, - confCopy, - experimentalMethodsCopy)) + def apply(session: SparkSession): HiveSessionState = { + new HiveSessionStateBuilder(session).build() } - } -private[hive] object HiveSessionState { - - def apply(sparkSession: SparkSession): HiveSessionState = { - apply(sparkSession, new SQLConf) - } - - def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = { - val initHelper = SessionState(sparkSession, conf) - - val sparkContext = sparkSession.sparkContext - - val catalog = HiveSessionCatalog( - sparkSession, - initHelper.functionRegistry, - initHelper.conf, - SessionState.newHadoopConf(sparkContext.hadoopConfiguration, initHelper.conf), - initHelper.sqlParser) - - val metadataHive: HiveClient = - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - .newSession() - - val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, initHelper.conf) +/** + * Builder that produces a [[HiveSessionState]]. + */ +@Experimental +@InterfaceStability.Unstable +class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None) + extends BaseSessionStateBuilder(session, parentState) { - val plannerCreator = createPlannerCreator( - sparkSession, - initHelper.conf, - initHelper.experimentalMethods) + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] - val hiveSessionState = new HiveSessionState( - sparkContext, - sparkSession.sharedState, - initHelper.conf, - initHelper.experimentalMethods, - initHelper.functionRegistry, - catalog, - initHelper.sqlParser, - metadataHive, - analyzer, - initHelper.streamingQueryManager, - initHelper.queryExecutionCreator, - plannerCreator) - catalog.functionResourceLoader = hiveSessionState.functionResourceLoader - hiveSessionState + /** + * Create a [[HiveSessionCatalog]]. + */ + override protected lazy val catalog: HiveSessionCatalog = { + val catalog = new HiveSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + new HiveMetastoreCatalog(session), + functionRegistry, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + new SessionFunctionResourceLoader(session)) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog } /** - * Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`. + * A logical query plan `Analyzer` with rules specific to Hive. */ - private def createAnalyzer( - sparkSession: SparkSession, - catalog: HiveSessionCatalog, - sqlConf: SQLConf): Analyzer = { - new Analyzer(catalog, sqlConf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(sparkSession) :: - new FindDataSourceTable(sparkSession) :: - new ResolveSQLOnFile(sparkSession) :: Nil - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - new DetermineTableStats(sparkSession) :: - catalog.ParquetConversions :: - catalog.OrcConversions :: - PreprocessTableCreation(sparkSession) :: - PreprocessTableInsertion(sqlConf) :: - DataSourceAnalysis(sqlConf) :: - HiveAnalysis :: Nil + override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + customResolutionRules + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + catalog.ParquetConversions +: + catalog.OrcConversions +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck +: + customCheckRules + } - override val extendedCheckRules = Seq(PreWriteCheck) + /** + * Planner that takes into account Hive-specific strategies. + */ + override protected def planner: SparkPlanner = { + new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies { + override val sparkSession: SparkSession = session + + override def extraPlanningStrategies: Seq[Strategy] = + super.extraPlanningStrategies ++ customPlanningStrategies + + override def strategies: Seq[Strategy] = { + experimentalMethods.extraStrategies ++ + extraPlanningStrategies ++ Seq( + FileSourceStrategy, + DataSourceStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + Scripts, + Aggregation, + JoinSelection, + BasicOperators + ) + } } } - private def createPlannerCreator( - associatedSparkSession: SparkSession, - sqlConf: SQLConf, - experimentalMethods: ExperimentalMethods): () => SparkPlanner = { - () => - new SparkPlanner( - associatedSparkSession.sparkContext, - sqlConf, - experimentalMethods.extraStrategies) - with HiveStrategies { - - override val sparkSession: SparkSession = associatedSparkSession + override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) - override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ Seq( - FileSourceStrategy, - DataSourceStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - Scripts, - Aggregation, - JoinSelection, - BasicOperators - ) - } - } + override def build(): HiveSessionState = { + val metadataHive: HiveClient = externalCatalog.client.newSession() + new HiveSessionState( + session.sparkContext, + session.sharedState, + conf, + experimentalMethods, + functionRegistry, + catalog, + sqlParser, + analyzer, + optimizer, + planner, + streamingQueryManager, + createQueryExecution, + createClone, + metadataHive) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index b63ed76967bd9..32ca69605ef4d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -148,12 +148,14 @@ class TestHiveContext( * * @param sc SparkContext * @param existingSharedState optional [[SharedState]] + * @param parentSessionState optional parent [[SessionState]] * @param loadTestTables if true, load the test tables. They can only be loaded when running * in the JVM, i.e when calling from Python this flag has to be false. */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, @transient private val existingSharedState: Option[TestHiveSharedState], + @transient private val parentSessionState: Option[HiveSessionState], private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => @@ -161,6 +163,7 @@ private[hive] class TestHiveSparkSession( this( sc, existingSharedState = None, + parentSessionState = None, loadTestTables) } @@ -168,6 +171,7 @@ private[hive] class TestHiveSparkSession( this( sc, existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))), + parentSessionState = None, loadTestTables) } @@ -192,36 +196,21 @@ private[hive] class TestHiveSparkSession( @transient override lazy val sessionState: HiveSessionState = { - val testConf = - new SQLConf { - clear() - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - override def clear(): Unit = { - super.clear() - TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } - } - } - val queryExecutionCreator = (plan: LogicalPlan) => new TestHiveQueryExecution(this, plan) - val initHelper = HiveSessionState(this, testConf) - SessionState.mergeSparkConf(testConf, sparkContext.getConf) - - new HiveSessionState( - sparkContext, - sharedState, - testConf, - initHelper.experimentalMethods, - initHelper.functionRegistry, - initHelper.catalog, - initHelper.sqlParser, - initHelper.metadataHive, - initHelper.analyzer, - initHelper.streamingQueryManager, - queryExecutionCreator, - initHelper.plannerCreator) + new TestHiveSessionStateBuilder(this, parentSessionState).build() } override def newSession(): TestHiveSparkSession = { - new TestHiveSparkSession(sc, Some(sharedState), loadTestTables) + new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables) + } + + override def cloneSession(): SparkSession = { + val result = new TestHiveSparkSession( + sparkContext, + Some(sharedState), + Some(sessionState), + loadTestTables) + result.sessionState // force copy of SessionState + result } private var cacheTables: Boolean = false @@ -595,3 +584,18 @@ private[hive] object TestHiveContext { } } + +private[sql] class TestHiveSessionStateBuilder( + session: SparkSession, + state: Option[SessionState]) + extends HiveSessionStateBuilder(session, state) + with WithTestConf { + + override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs + + override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan => + new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan) + } + + override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _) +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala deleted file mode 100644 index 3b0f59b15916c..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.net.URI - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry -import org.apache.spark.sql.catalyst.catalog.CatalogDatabase -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.Range -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.Utils - -class HiveSessionCatalogSuite extends TestHiveSingleton { - - test("clone HiveSessionCatalog") { - val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog] - - val tempTableName1 = "copytest1" - val tempTableName2 = "copytest2" - try { - val tempTable1 = Range(1, 10, 1, 10) - original.createTempView(tempTableName1, tempTable1, overrideIfExists = false) - - // check if tables copied over - val clone = original.newSessionCatalogWith( - spark, - new SQLConf, - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) - assert(original ne clone) - assert(clone.getTempView(tempTableName1) == Some(tempTable1)) - - // check if clone and original independent - clone.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = false, purge = false) - assert(original.getTempView(tempTableName1) == Some(tempTable1)) - - val tempTable2 = Range(1, 20, 2, 10) - original.createTempView(tempTableName2, tempTable2, overrideIfExists = false) - assert(clone.getTempView(tempTableName2).isEmpty) - } finally { - // Drop the created temp views from the global singleton HiveSession. - original.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = true, purge = true) - original.dropTable(TableIdentifier(tempTableName2), ignoreIfNotExists = true, purge = true) - } - } - - test("clone SessionCatalog - current db") { - val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog] - val originalCurrentDatabase = original.getCurrentDatabase - val db1 = "db1" - val db2 = "db2" - val db3 = "db3" - try { - original.createDatabase(newDb(db1), ignoreIfExists = true) - original.createDatabase(newDb(db2), ignoreIfExists = true) - original.createDatabase(newDb(db3), ignoreIfExists = true) - - original.setCurrentDatabase(db1) - - // check if tables copied over - val clone = original.newSessionCatalogWith( - spark, - new SQLConf, - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) - - // check if current db copied over - assert(original ne clone) - assert(clone.getCurrentDatabase == db1) - - // check if clone and original independent - clone.setCurrentDatabase(db2) - assert(original.getCurrentDatabase == db1) - original.setCurrentDatabase(db3) - assert(clone.getCurrentDatabase == db2) - } finally { - // Drop the created databases from the global singleton HiveSession. - original.dropDatabase(db1, ignoreIfNotExists = true, cascade = true) - original.dropDatabase(db2, ignoreIfNotExists = true, cascade = true) - original.dropDatabase(db3, ignoreIfNotExists = true, cascade = true) - original.setCurrentDatabase(originalCurrentDatabase) - } - } - - def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) - - def newDb(name: String): CatalogDatabase = { - CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) - } -}