-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32945][SQL] Avoid collapsing projects if reaching max allowed common exprs #29950
Conversation
@@ -766,6 +768,23 @@ object CollapseProject extends Rule[LogicalPlan] { | |||
}) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could extend to other cases like case p @ Project(_, agg: Aggregate)
, but leave it untouched for now.
Kubernetes integration test starting |
Kubernetes integration test status success |
Related to #29094 ? |
No, after did a quick scan of that PR. That PR targets driver OOM caused by too many leaf expressions in collapsed Project. Here this diff cares about duplicated common expressions in collapsed Project. Different problems, I think. |
Perhaps the max number of common expressions is not the best metric here? Lets compare two cases:
Adding more Although I must admit, that in that case we might cache more values for the number of extra computations we save. |
val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject | ||
|
||
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) || | ||
getLargestNumOfCommonOutput(p1.projectList, p2.projectList) >= maxCommonExprs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this comparison should be >
instead of >=
, because currently the actual max value is maxCommonExprs - 1
.
Test build #129432 has finished for PR 29950 at commit
|
Yes, in the case you add the number of redundant computations each time you add one more The number of redundant computations is misleading. If we have 100 |
cc @cloud-fan @dongjoon-hyun too |
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test starting |
This comment has been minimized.
This comment has been minimized.
Kubernetes integration test status success |
Kubernetes integration test status success |
This comment has been minimized.
This comment has been minimized.
val query = relation.select( | ||
JsonToStructs(schema, options, 'json).as("struct")) | ||
.select( | ||
GetStructField('struct, 0).as("a"), | ||
GetStructField('struct, 1).as("b"), | ||
GetStructField('struct, 2).as("c")).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using the dataset API, then it would be very common to chain withColumn
calls:
dataset
.withColumn("json", ...)
.withColumn("a", col("json").getField("a"))
.withColumn("b", col("json").getField("b"))
.withColumn("c", col("json").getField("c"))
In that case the query should look more like this:
val query = relation
.select('json, JsonToStructs(schema, options, 'json).as("struct"))
.select('json, 'struct, GetStructField('struct, 0).as("a"))
.select('json, 'struct, 'a, GetStructField('struct, 1).as("b"))
.select('json, 'struct, 'a, 'b, GetStructField('struct, 2).as("c"))
.analyze
The CollapseProject
rule uses transformUp
. It seems that in that case we do not get the expected results from this optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems can be fixed by using transformDown
instead? Seems to me CollapseProject
is not necessarily to use transformUp
if I don't miss anything. cc @cloud-fan @maropu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a chain of projects: P1(P2(P3(P4(...))))
, then using transformDown
will firstly merge P1
and P2
into P12
and then it will go to its child P3
and merge it with P4
into P34
. Only on the second iteration it will merge all 4 of these.
In this case we want to merge P123
and then see, that we can't merge with P4
because we would exceed maxCommonExprsInCollapseProject
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, that correct way would be using transformDown
in a similar manner to recursiveRemoveSort
in #21072.
So basically when you hit the first Project
, then you collect all consecutive Projects
until you hit the maxCommonExprsInCollapseProject
limit and merge them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, it sounds fine, too. Rather, it seems a top-down transformation can collapse projects in one shot just like RemoveRedundantProjects
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we need to change to transformDown
and take a recursive approach like RemoveRedundantProjects
and recursiveRemoveSort
for collapsing Project
.
"if merging two Project, Spark SQL will skip the merging.") | ||
.version("3.1.0") | ||
.intConf | ||
.createWithDefault(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. Is there a reason to choose 20
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just decide a number that seems bad for repeating an expression.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject | ||
|
||
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) || | ||
getLargestNumOfCommonOutput(p1.projectList, p2.projectList) > maxCommonExprs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation?
@@ -124,14 +128,34 @@ object ScanOperation extends OperationHelper with PredicateHelper { | |||
}.exists(!_.deterministic)) | |||
} | |||
|
|||
def moreThanMaxAllowedCommonOutput( | |||
expr: Seq[NamedExpression], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation? It seems that there is one more space here.
// do not have common non-deterministic expressions, or do not have equal to/more than | ||
// maximum allowed common outputs. | ||
if (!hasCommonNonDeterministic(fields, aliases) | ||
|| !moreThanMaxAllowedCommonOutput(fields, aliases)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, you may want to move ||
into line 157.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. thanks.
This comment has been minimized.
This comment has been minimized.
Kubernetes integration test starting |
gentle ping @dongjoon-hyun @cloud-fan |
Oops. Sorry for being late, @viirya . |
Retest this please. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130972 has finished for PR 29950 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #130983 has finished for PR 29950 at commit
|
.version("3.1.0") | ||
.intConf | ||
.checkValue(_ > 0, "The value of maxCommonExprsInCollapseProject must be larger than zero.") | ||
.createWithDefault(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, can we introduce this configuration with Int.MaxValue
in 3.1.0
first? We can reduce it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. It is safer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #131024 has finished for PR 29950 at commit
|
The GitHub Action's flakiness at |
} else { | ||
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList)) | ||
} | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to change from transformUp
to transformDown
? If the all test passed, it would be safe if we keep the original one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the previous comment about supporting withColumn
. If this is designed for that, shall we add a test case for that?
// If we collapse two Projects, `JsonToStructs` will be repeated three times. | ||
val relation = LocalRelation('json.string) | ||
val query1 = relation.select( | ||
JsonToStructs(schema, options, 'json).as("struct")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation? Maybe, the following is better?
- val query1 = relation.select(
- JsonToStructs(schema, options, 'json).as("struct"))
- .select(
+ val query1 = relation.select(JsonToStructs(schema, options, 'json).as("struct"))
+ .select(
"the physical planning.") | ||
.version("3.1.0") | ||
.intConf | ||
.checkValue(_ > 0, "The value of maxCommonExprsInCollapseProject must be larger than zero.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
larger than zero
-> positive
.
}) | ||
} | ||
|
||
// Whether the largest times common outputs from lower operator used in upper operators is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upper operators
-> upper operator
?
} | ||
|
||
// Whether the largest times common outputs from lower operator used in upper operators is | ||
// larger than allowed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
than allowed
-> than the maximum
?
Retest this please. |
Test build #131729 has finished for PR 29950 at commit
|
retest this please |
Test build #131780 has finished for PR 29950 at commit
|
I recently generalized subexpression elimination feature to interpreted project and predicate. So now both whole-stage codegen and interpreted execution support subexpression elimination that could avoid the performance issue caused by embedding common expressions from collapsing projects. That's said, I think this patch is less useful for now. I'm closing it now. |
Thank you for your decision, @viirya ! |
Thanks @viirya |
What changes were proposed in this pull request?
This patch proposes to avoid collapsing adjacent
Project
in query optimizer if the combinedProject
will duplicate too many common expressions. One SQL configspark.sql.optimizer.maxCommonExprsInCollapseProject
is added to set up the maximum allowed number of common expressions.Why are the changes needed?
In some edge cases, collapsing adjacent
Project
hurts performance, instead of improving it. We observed such behavior in our customer Spark jobs where one expensive expression was repeatedly duplicated many times. It is hard to have a optimizer rule that could decide whether to collapse twoProject
s because we don't know the cost of each expression. Currently we can provide a SQL config so users can set it up to change optimizer's behavior regarding collapsing adjacentProject
s.Note that normally in whole-stage codegen Project operator will de-duplicate expressions internally, but in edge cases Spark cannot do whole-stage codegen and fallback to interpreted mode. In such cases, users can use this config to avoid duplicate expressions.
Does this PR introduce any user-facing change?
Yes. Users can change optimizer's behavior regarding collapsing
Project
s by setting SQL config.How was this patch tested?
Unit test.