Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Mar 27, 2017
1 parent ecf7998 commit 4c5ed96
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.ExecutionListenerManager


/**
* A class that holds all session-specific state in a given [[SparkSession]].
*
Expand Down Expand Up @@ -147,233 +144,6 @@ private[sql] object SessionState {
}
}

/**
* Builder class that coordinates construction of a new [[SessionState]].
*
* The builder explicitly defines all components needed by the session state, and creates a session
* state when `build` is called. Components should only be initialized once. This is not a problem
* for most components as they are only used in the `build` function. However some components
* (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are as dependencies
* for other components and are shared as a result. These components are defined as lazy vals to
* make sure the component is created only once.
*
* A developer can modify the builder by providing custom versions of components, or by using the
* hooks provided for the analyzer, optimizer & planner. There are some dependencies between the
* components (they are documented per dependency), a developer should respect these when making
* modifications in order to prevent initialization problems.
*
* A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The new session
* state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods`
* and `catalog` fields. Note that the state is cloned when `build` is called, and not before.
*/
@Experimental
@InterfaceStability.Unstable
abstract class BaseSessionStateBuilder(
val session: SparkSession,
val parentState: Option[SessionState] = None) {
type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder

/**
* Extract entries from `SparkConf` and put them in the `SQLConf`
*/
protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
sparkConf.getAll.foreach { case (k, v) =>
sqlConf.setConfString(k, v)
}
}

/**
* SQL-specific key-value configurations.
*
* These either get cloned from a pre-existing instance or newly created. The conf is always
* merged with its [[SparkConf]].
*/
protected lazy val conf: SQLConf = {
val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
mergeSparkConf(conf, session.sparkContext.conf)
conf
}

/**
* Internal catalog managing functions registered by the user.
*
* This either gets cloned from a pre-existing version or cloned from the build-in registry.
*/
protected lazy val functionRegistry: FunctionRegistry = {
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
}

/**
* Experimental methods that can be used to define custom optimization rules and custom planning
* strategies.
*
* This either gets cloned from a pre-existing version or newly created.
*/
protected lazy val experimentalMethods: ExperimentalMethods = {
parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods)
}

/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
*
* Note: this depends on the `conf` field.
*/
protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)

/**
* Catalog for managing table and database states. If there is a pre-existing catalog, the state
* of that catalog (temp tables & current database) will be copied into the new catalog.
*
* Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.
*/
protected lazy val catalog: SessionCatalog = {
val catalog = new SessionCatalog(
session.sharedState.externalCatalog,
session.sharedState.globalTempViewManager,
functionRegistry,
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
new SessionFunctionResourceLoader(session))
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}

/**
* Logical query plan analyzer for resolving unresolved attributes and relations.
*
* Note: this depends on the `conf` and `catalog` fields.
*/
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules

override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules

override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
HiveOnlyCheck +:
customCheckRules
}

/**
* Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating
* your own Analyzer.
*
* Note that this may NOT depend on the `analyzer` function.
*/
protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Custom post resolution rules to add to the Analyzer. Prefer overriding this instead of
* creating your own Analyzer.
*
* Note that this may NOT depend on the `analyzer` function.
*/
protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Custom check rules to add to the Analyzer. Prefer overriding this instead of creating
* your own Analyzer.
*
* Note that this may NOT depend on the `analyzer` function.
*/
protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil

/**
* Logical query plan optimizer.
*
* Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields.
*/
protected def optimizer: Optimizer = {
new SparkOptimizer(catalog, conf, experimentalMethods) {
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
}
}

/**
* Custom operator optimization rules to add to the Optimizer. Prefer overriding this instead
* of creating your own Optimizer.
*
* Note that this may NOT depend on the `optimizer` function.
*/
protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Planner that converts optimized logical plans to physical plans.
*
* Note: this depends on the `conf` and `experimentalMethods` fields.
*/
protected def planner: SparkPlanner = {
new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}

/**
* Custom strategies to add to the planner. Prefer overriding this instead of creating
* your own Planner.
*
* Note that this may NOT depend on the `planner` function.
*/
protected def customPlanningStrategies: Seq[Strategy] = Nil

/**
* Create a query execution object.
*/
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
new QueryExecution(session, plan)
}

/**
* Interface to start and stop streaming queries.
*/
protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session)

/**
* Function that produces a new instance of the SessionStateBuilder. This is used by the
* [[SessionState]]'s clone functionality. Make sure to override this when implementing your own
* [[SessionStateBuilder]].
*/
protected def newBuilder: NewBuilder

/**
* Function used to make clones of the session state.
*/
protected def createClone: (SparkSession, SessionState) => SessionState = {
val createBuilder = newBuilder
(session, state) => createBuilder(session, Option(state)).build()
}

/**
* Build the [[SessionState]].
*/
def build(): SessionState = {
new SessionState(
session.sparkContext,
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
catalog,
sqlParser,
analyzer,
optimizer,
planner,
streamingQueryManager,
createQueryExecution,
createClone)
}
}

/**
* Concrete implementation of a [[SessionStateBuilder]].
*/
Expand All @@ -386,28 +156,6 @@ class SessionStateBuilder(
override protected def newBuilder: NewBuilder = new SessionStateBuilder(_, _)
}

/**
* Helper class for using SessionStateBuilders during tests.
*/
private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
def overrideConfs: Map[String, String]

override protected lazy val conf: SQLConf = {
val conf = parentState.map(_.conf.clone()).getOrElse {
new SQLConf {
clear()
override def clear(): Unit = {
super.clear()
// Make sure we start with the default test configs even after clear
overrideConfs.foreach { case (key, value) => setConfString(key, value) }
}
}
}
mergeSparkConf(conf, session.sparkContext.conf)
conf
}
}

/**
* Session shared [[FunctionResourceLoader]].
*/
Expand Down
Loading

0 comments on commit 4c5ed96

Please sign in to comment.