-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20100][SQL] Refactor SessionState initialization #17433
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,20 +17,14 @@ | |
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.immutable.HashSet | ||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.api.java.function.FilterFunction | ||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} | ||
import org.apache.spark.sql.catalyst.analysis._ | ||
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} | ||
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins | ||
import org.apache.spark.sql.catalyst.plans._ | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules._ | ||
|
@@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) | |
Batch("Aggregate", fixedPoint, | ||
RemoveLiteralFromGroupExpressions, | ||
RemoveRepetitionFromGroupExpressions) :: | ||
Batch("Operator Optimizations", fixedPoint, | ||
Batch("Operator Optimizations", fixedPoint, Seq( | ||
// Operator push down | ||
PushProjectionThroughUnion, | ||
ReorderJoin(conf), | ||
|
@@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) | |
RemoveRedundantProject, | ||
SimplifyCreateStructOps, | ||
SimplifyCreateArrayOps, | ||
SimplifyCreateMapOps) :: | ||
SimplifyCreateMapOps) ++ | ||
extendedOperatorOptimizationRules: _*) :: | ||
Batch("Check Cartesian Products", Once, | ||
CheckCartesianProducts(conf)) :: | ||
Batch("Join Reorder", Once, | ||
|
@@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) | |
s.withNewPlan(newPlan) | ||
} | ||
} | ||
|
||
/** | ||
* Override to provide additional rules for the operator optimization batch. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure whether we need to split the batch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anything in catalyst can be changed between spark versions. This hook included. |
||
*/ | ||
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,13 +27,14 @@ import org.apache.spark.sql.internal.SQLConf | |
class SparkPlanner( | ||
val sparkContext: SparkContext, | ||
val conf: SQLConf, | ||
val extraStrategies: Seq[Strategy]) | ||
val experimentalMethods: ExperimentalMethods) | ||
extends SparkStrategies { | ||
|
||
def numPartitions: Int = conf.numShufflePartitions | ||
|
||
def strategies: Seq[Strategy] = | ||
extraStrategies ++ ( | ||
experimentalMethods.extraStrategies ++ | ||
extraPlanningStrategies ++ ( | ||
FileSourceStrategy :: | ||
DataSourceStrategy :: | ||
SpecialLimits :: | ||
|
@@ -42,6 +43,8 @@ class SparkPlanner( | |
InMemoryScans :: | ||
BasicOperators :: Nil) | ||
|
||
def extraPlanningStrategies: Seq[Strategy] = Nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding comments for this func too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { | ||
plan.collect { | ||
case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing it to copying the state from a source SessionCatalog to the current one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have to synchronize on the source catalog (the target catalog is thread-safe since it is not visible to the outside world yet), and I would like to keep the locking internal to the source catalog. I will add some explanation.