-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20100][SQL] Refactor SessionState initialization #17433
Conversation
cc @cloud-fan |
cc @kunalkhamar |
Test build #75234 has finished for PR 17433 at commit
|
@@ -42,6 +43,8 @@ class SparkPlanner( | |||
InMemoryScans :: | |||
BasicOperators :: Nil) | |||
|
|||
def extraPlanningStrategies: Seq[Strategy] = Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding comments for this func too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* [[SessionState]]'s clone functionality. Make sure to override this when implementing your own | ||
* [[SessionStateBuilder]]. | ||
*/ | ||
protected def newBuilder: NewBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we make it a method instead of returning a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: newBuilder
-> newBuilder()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I make this a method, then it captures the entire builder and as a result the parent SessionState
; the latter can cause some issues for garbage collection if we use cloneSession
a lot and have relatively short lived sessions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move this to be the first def in BaseSessionStateBuilder
, for readability? A future api user would see it next to type NewBuilder
and also realize they need to provide an implementation quicker.
@InterfaceStability.Unstable | ||
class SessionStateBuilder( | ||
session: SparkSession, | ||
state: Option[SessionState] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep the original name parentState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sparkConf.getAll.foreach { case (k, v) => | ||
sqlConf.setConfString(k, v) | ||
/** | ||
* Session based [[FunctionResourceLoader]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is not session based. So far, the resource loader is session shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me change this.
BTW: my first follow-up will be to move SessionState.addJar
into the function resource loader, and have a Hive aware resource loader.
/** | ||
* Build the [[SessionState]]. | ||
*/ | ||
def build: SessionState = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: build
-> build()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, build does have a side effect. Done.
/** | ||
* Function used to make clones of the session state. | ||
*/ | ||
protected def createClone: (SparkSession, SessionState) => SessionState = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that make sense to mark createClone
final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, however I would like to keep the builder as open as possible.
/** | ||
* Helper class for using SessionStateBuilders during tests. | ||
*/ | ||
private[sql] trait WithTestConf { self: BaseSessionStateBuilder => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's only used in test, shall we move this class to test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in TestHive
which is part of main
instead of test
(no idea why that is btw).
/** | ||
* Logical query plan analyzer for resolving unresolved attributes and relations. | ||
* | ||
* Note: this depends on the `conf` and `catalog` field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: field
-> fields
Conceptually, yes, but we also expose SQLContext
to the external data source APIs for relation resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this also depends on the SessionState
and its sqlContext
wrapper. The main goal of these notes were to document the dependencies within the builder.
*/ | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
abstract class BaseSessionStateBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we need to create a separate file for this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving to separate file may be a good idea.
hadoopConf, | ||
parser) | ||
|
||
private[sql] def copyStateTo(target: SessionCatalog): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing it to copying the state from a source SessionCatalog to the current one?
private[sql] def copyState(source: SessionCatalog): Unit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have to synchronize on the source catalog (the target catalog is thread-safe since it is not visible to the outside world yet), and I would like to keep the locking internal to the source catalog. I will add some explanation.
@@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) | |||
s.withNewPlan(newPlan) | |||
} | |||
} | |||
|
|||
/** | |||
* Override to provide additional rules for the operator optimization batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we need to split the batch Operator Optimizations
to smaller independent batches or move some rules out of this batch in the future. If so, the location of this rule becomes unstable. We might need to explain it in the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anything in catalyst can be changed between spark versions. This hook included.
Test build #75260 has finished for PR 17433 at commit
|
Looks good to me. It looks much cleaner after this PR! Thank you! It sounds like this PR also partially resolves the JIRA: https://issues.apache.org/jira/browse/SPARK-18127 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking close!
/** | ||
* Internal catalog managing functions registered by the user. | ||
* | ||
* This either gets cloned from a pre-existing version or cloned from the build-in registry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: built-in registry
*/ | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
abstract class BaseSessionStateBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving to separate file may be a good idea.
* @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]] | ||
* @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies | ||
* @param optimizer Logical query plan optimizer. | ||
* @param planner Planner that converts optimized logical plans to physical plans |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the comment used to include "planner takes into account Hive-specific strategies", lets add that back for completeness?
} | ||
|
||
/** | ||
* Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`. | ||
* An logical query plan `Analyzer` with rules specific to Hive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: "A logical ..."
* [[SessionState]]'s clone functionality. Make sure to override this when implementing your own | ||
* [[SessionStateBuilder]]. | ||
*/ | ||
protected def newBuilder: NewBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move this to be the first def in BaseSessionStateBuilder
, for readability? A future api user would see it next to type NewBuilder
and also realize they need to provide an implementation quicker.
initHelper.streamingQueryManager, | ||
queryExecutionCreator, | ||
initHelper.plannerCreator) | ||
new TestHiveSessionStateBuilder(this, parentSessionState).build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.build()
} | ||
}) | ||
override lazy val sessionState: SessionState = { | ||
new TestSQLSessionStateBuilder(this, None).build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.build()
newSparkSession, | ||
confCopy, | ||
experimentalMethodsCopy)) | ||
def apply(session: SparkSession): SessionState = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: HiveSessionState = {
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test | |||
|
|||
import org.apache.spark.{SparkConf, SparkContext} | |||
import org.apache.spark.sql.SparkSession | |||
import org.apache.spark.sql.internal.{SessionState, SQLConf} | |||
import org.apache.spark.sql.internal._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf, WithTestConf}
import org.apache.spark.sql.execution.datasources._ | ||
import org.apache.spark.sql.hive.client.HiveClient | ||
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} | ||
import org.apache.spark.sql.internal._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
Test build #75282 has finished for PR 17433 at commit
|
LGTM, merging to master! |
What changes were proposed in this pull request?
The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.
This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:
HiveSessionState
. Removing theHiveSessionState
would also require us to move resource loading into a separate class, and to (re)move metadata hive.How was this patch tested?
Existing tests.