Skip to content

Commit

Permalink
Refactor SessionState initialization.
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Mar 26, 2017
1 parent 93bb0b9 commit 711582f
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 550 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class SessionCatalog(
functionRegistry: FunctionRegistry,
conf: CatalystConf,
hadoopConf: Configuration,
parser: ParserInterface) extends Logging {
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader) extends Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec

Expand All @@ -69,8 +70,8 @@ class SessionCatalog(
functionRegistry,
conf,
new Configuration(),
CatalystSqlParser)
functionResourceLoader = DummyFunctionResourceLoader
CatalystSqlParser,
DummyFunctionResourceLoader)
}

// For testing only.
Expand All @@ -90,9 +91,7 @@ class SessionCatalog(
// check whether the temporary table or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)

@volatile var functionResourceLoader: FunctionResourceLoader = _
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)

/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
Expand Down Expand Up @@ -1059,9 +1058,6 @@ class SessionCatalog(
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
if (functionResourceLoader == null) {
throw new IllegalStateException("functionResourceLoader has not yet been initialized")
}
resources.foreach(functionResourceLoader.loadResource)
}

Expand Down Expand Up @@ -1259,28 +1255,13 @@ class SessionCatalog(
}

/**
* Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
* `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
* Copy the current state of the catalog to another catalog.
*/
def newSessionCatalogWith(
conf: CatalystConf,
hadoopConf: Configuration,
functionRegistry: FunctionRegistry,
parser: ParserInterface): SessionCatalog = {
val catalog = new SessionCatalog(
externalCatalog,
globalTempViewManager,
functionRegistry,
conf,
hadoopConf,
parser)

private[sql] def copyStateTo(target: SessionCatalog): Unit = {
synchronized {
catalog.currentDb = currentDb
target.currentDb = currentDb
// copy over temporary tables
tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2))
}

catalog
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.annotation.tailrec
import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) ::
Batch("Operator Optimizations", fixedPoint,
Batch("Operator Optimizations", fixedPoint, Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin(conf),
Expand Down Expand Up @@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
SimplifyCreateMapOps) ::
SimplifyCreateMapOps) ++
extendedOperatorOptimizationRules: _*) ::
Batch("Check Cartesian Products", Once,
CheckCartesianProducts(conf)) ::
Batch("Join Reorder", Once,
Expand Down Expand Up @@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
s.withNewPlan(newPlan)
}
}

/**
* Override to provide additional rules for the operator optimization batch.
*/
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -1331,17 +1329,15 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}

test("clone SessionCatalog - temp views") {
test("copy SessionCatalog state - temp views") {
withEmptyCatalog { original =>
val tempTable1 = Range(1, 10, 1, 10)
original.createTempView("copytest1", tempTable1, overrideIfExists = false)

// check if tables copied over
val clone = original.newSessionCatalogWith(
SimpleCatalystConf(caseSensitiveAnalysis = true),
new Configuration(),
new SimpleFunctionRegistry,
CatalystSqlParser)
val clone = new SessionCatalog(original.externalCatalog)
original.copyStateTo(clone)

assert(original ne clone)
assert(clone.getTempView("copytest1") == Some(tempTable1))

Expand All @@ -1355,7 +1351,7 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}

test("clone SessionCatalog - current db") {
test("copy SessionCatalog state - current db") {
withEmptyCatalog { original =>
val db1 = "db1"
val db2 = "db2"
Expand All @@ -1368,11 +1364,9 @@ abstract class SessionCatalogSuite extends PlanTest {
original.setCurrentDatabase(db1)

// check if current db copied over
val clone = original.newSessionCatalogWith(
SimpleCatalystConf(caseSensitiveAnalysis = true),
new Configuration(),
new SimpleFunctionRegistry,
CatalystSqlParser)
val clone = new SessionCatalog(original.externalCatalog)
original.copyStateTo(clone)

assert(original ne clone)
assert(clone.getCurrentDatabase == db1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ class SparkOptimizer(
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalog, conf) {

override def batches: Seq[Batch] = super.batches :+
override def batches: Seq[Batch] = (super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)

/**
* Optimization batches that are executed after the regular optimization batches, but before the
* batch executing the [[ExperimentalMethods]] optimizer rules. This hook can be used to add
* custom optimizer batches to the Spark optimizer.
*/
def postHocOptimizationBatches: Seq[Batch] = Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import org.apache.spark.sql.internal.SQLConf
class SparkPlanner(
val sparkContext: SparkContext,
val conf: SQLConf,
val extraStrategies: Seq[Strategy])
val experimentalMethods: ExperimentalMethods)
extends SparkStrategies {

def numPartitions: Int = conf.numShufflePartitions

def strategies: Seq[Strategy] =
extraStrategies ++ (
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
SpecialLimits ::
Expand All @@ -42,6 +43,8 @@ class SparkPlanner(
InMemoryScans ::
BasicOperators :: Nil)

def extraPlanningStrategies: Seq[Strategy] = Nil

override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
plan.collect {
case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Literal}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
Expand All @@ -40,20 +40,17 @@ class IncrementalExecution(
offsetSeqMetadata: OffsetSeqMetadata)
extends QueryExecution(sparkSession, logicalPlan) with Logging {

// TODO: make this always part of planning.
val streamingExtraStrategies =
sparkSession.sessionState.planner.StatefulAggregationStrategy +:
sparkSession.sessionState.planner.FlatMapGroupsWithStateStrategy +:
sparkSession.sessionState.planner.StreamingRelationStrategy +:
sparkSession.sessionState.planner.StreamingDeduplicationStrategy +:
sparkSession.sessionState.experimentalMethods.extraStrategies

// Modified planner with stateful operations.
override def planner: SparkPlanner =
new SparkPlanner(
override val planner: SparkPlanner = new SparkPlanner(
sparkSession.sparkContext,
sparkSession.sessionState.conf,
streamingExtraStrategies)
sparkSession.sessionState.experimentalMethods) {
override def extraPlanningStrategies: Seq[Strategy] =
StatefulAggregationStrategy ::
FlatMapGroupsWithStateStrategy ::
StreamingRelationStrategy ::
StreamingDeduplicationStrategy :: Nil
}

/**
* See [SPARK-18339]
Expand Down
Loading

0 comments on commit 711582f

Please sign in to comment.