Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20100][SQL] Refactor SessionState initialization #17433

Closed
wants to merge 3 commits into from

Conversation

hvanhovell
Copy link
Contributor

What changes were proposed in this pull request?

The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.

This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:

  1. This provides us with a start for removing the HiveSessionState. Removing the HiveSessionState would also require us to move resource loading into a separate class, and to (re)move metadata hive.
  2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this.

How was this patch tested?

Existing tests.

@hvanhovell
Copy link
Contributor Author

cc @cloud-fan

@hvanhovell
Copy link
Contributor Author

cc @kunalkhamar

@SparkQA
Copy link

SparkQA commented Mar 26, 2017

Test build #75234 has finished for PR 17433 at commit 711582f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BaseSessionStateBuilder(
  • class SessionStateBuilder(
  • class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader
  • class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None)

@@ -42,6 +43,8 @@ class SparkPlanner(
InMemoryScans ::
BasicOperators :: Nil)

def extraPlanningStrategies: Seq[Strategy] = Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding comments for this func too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* [[SessionState]]'s clone functionality. Make sure to override this when implementing your own
* [[SessionStateBuilder]].
*/
protected def newBuilder: NewBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make it a method instead of returning a function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newBuilder -> newBuilder()

Copy link
Contributor Author

@hvanhovell hvanhovell Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I make this a method, then it captures the entire builder and as a result the parent SessionState; the latter can cause some issues for garbage collection if we use cloneSession a lot and have relatively short lived sessions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to be the first def in BaseSessionStateBuilder, for readability? A future api user would see it next to type NewBuilder and also realize they need to provide an implementation quicker.

@InterfaceStability.Unstable
class SessionStateBuilder(
session: SparkSession,
state: Option[SessionState] = None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the original name parentState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sparkConf.getAll.foreach { case (k, v) =>
sqlConf.setConfString(k, v)
/**
* Session based [[FunctionResourceLoader]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is not session based. So far, the resource loader is session shared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me change this.

BTW: my first follow-up will be to move SessionState.addJar into the function resource loader, and have a Hive aware resource loader.

/**
* Build the [[SessionState]].
*/
def build: SessionState = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: build -> build()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, build does have a side effect. Done.

/**
* Function used to make clones of the session state.
*/
protected def createClone: (SparkSession, SessionState) => SessionState = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that make sense to mark createClone final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, however I would like to keep the builder as open as possible.

/**
* Helper class for using SessionStateBuilders during tests.
*/
private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's only used in test, shall we move this class to test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in TestHive which is part of main instead of test (no idea why that is btw).

/**
* Logical query plan analyzer for resolving unresolved attributes and relations.
*
* Note: this depends on the `conf` and `catalog` field.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: field -> fields

Conceptually, yes, but we also expose SQLContext to the external data source APIs for relation resolution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this also depends on the SessionState and its sqlContext wrapper. The main goal of these notes were to document the dependencies within the builder.

*/
@Experimental
@InterfaceStability.Unstable
abstract class BaseSessionStateBuilder(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to create a separate file for this class?

Copy link
Contributor

@kunalkhamar kunalkhamar Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to separate file may be a good idea.

hadoopConf,
parser)

private[sql] def copyStateTo(target: SessionCatalog): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing it to copying the state from a source SessionCatalog to the current one?

private[sql] def copyState(source: SessionCatalog): Unit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have to synchronize on the source catalog (the target catalog is thread-safe since it is not visible to the outside world yet), and I would like to keep the locking internal to the source catalog. I will add some explanation.

@@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
s.withNewPlan(newPlan)
}
}

/**
* Override to provide additional rules for the operator optimization batch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we need to split the batch Operator Optimizations to smaller independent batches or move some rules out of this batch in the future. If so, the location of this rule becomes unstable. We might need to explain it in the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything in catalyst can be changed between spark versions. This hook included.

@SparkQA
Copy link

SparkQA commented Mar 27, 2017

Test build #75260 has finished for PR 17433 at commit ecf7998.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Looks good to me. It looks much cleaner after this PR! Thank you!

It sounds like this PR also partially resolves the JIRA: https://issues.apache.org/jira/browse/SPARK-18127

Copy link
Contributor

@kunalkhamar kunalkhamar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking close!

/**
* Internal catalog managing functions registered by the user.
*
* This either gets cloned from a pre-existing version or cloned from the build-in registry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: built-in registry

*/
@Experimental
@InterfaceStability.Unstable
abstract class BaseSessionStateBuilder(
Copy link
Contributor

@kunalkhamar kunalkhamar Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to separate file may be a good idea.

* @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
* @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies
* @param optimizer Logical query plan optimizer.
* @param planner Planner that converts optimized logical plans to physical plans
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comment used to include "planner takes into account Hive-specific strategies", lets add that back for completeness?

}

/**
* Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`.
* An logical query plan `Analyzer` with rules specific to Hive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: "A logical ..."

* [[SessionState]]'s clone functionality. Make sure to override this when implementing your own
* [[SessionStateBuilder]].
*/
protected def newBuilder: NewBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to be the first def in BaseSessionStateBuilder, for readability? A future api user would see it next to type NewBuilder and also realize they need to provide an implementation quicker.

initHelper.streamingQueryManager,
queryExecutionCreator,
initHelper.plannerCreator)
new TestHiveSessionStateBuilder(this, parentSessionState).build
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.build()

}
})
override lazy val sessionState: SessionState = {
new TestSQLSessionStateBuilder(this, None).build
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.build()

newSparkSession,
confCopy,
experimentalMethodsCopy))
def apply(session: SparkSession): SessionState = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: HiveSessionState = {

@@ -19,7 +19,7 @@ package org.apache.spark.sql.test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.internal._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf, WithTestConf}

import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.internal._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}

@SparkQA
Copy link

SparkQA commented Mar 28, 2017

Test build #75282 has finished for PR 17433 at commit 4c5ed96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BaseSessionStateBuilder(

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants