Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21351][SQL] Update nullability based on children's output #18576

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveRedundantProject)
RemoveRedundantProject) :+
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that this rule is very similar to FixNullability. Do we have to keep both of them? cc @maropu @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateNullabilityInAttributeReferences is an optimizer rule that was added after we introduced the analyzer rule FixNullability. Do we have any end-to-end test case for showing the benefit of this optimizer rule?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I have the same opinion with @gatorsmile; FixNullability is related to the correctness, but UpdateNullabilityInAttributeReferences is not (just for optimization).

@gatorsmile I think we haven't checked actual performance benefits (e.g., wall time) from the rule now. I just assume that this rule could generate better code from the code generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But these 2 rules are almost same, except that FixNullability skips resolved plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I see... But, I have no idea about the way to resolve the two issue (the correctness issue and the optimization issue) simultaneously in a single place. I think there is one greedy & temporary solution that merges the current two rule into one, and then put it in the two places: the analyzer and the optimizer.

}

/**
Expand Down Expand Up @@ -1309,3 +1311,18 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
}
}
}

/**
* Updates nullability in [[AttributeReference]]s if nullability is different between
* non-leaf plan's expressions and the children output.
*/
object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rule itself is useful without the filter optimization.

In general, I don't think it's useful to optimize filter using the IsNotNull expression, as it's not a common case. I think we can update Join.output to specify nullability for join keys, which seems easier and a more common case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll make this pr simpler (I'll drop some changes for filters).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped the changes of execution.FilterExec though, you suggested we would drop the changes of logical.Filter, too? https://github.com/apache/spark/pull/18576/files#diff-72917e7b68f0311b2fb42990e0dc616dR139

I basically agree that the Join.output modification is more simple/important, but is it okay to ignore nullability in logical.Filter? For example, in the current master, QueryPlanConstraints.inferIsNotNullConstraints appends non-nullable constraints in logical.Filter and this constraints aren't correctly propagated into upper plan nodes now. So, I think it'd be better to respect nullability in both logical.Join and logical.Filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think more about how these IsNotNull constraints are generated, they mostly come from join. I don't think it's a common case users put IsNotNull in filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll drop it in this pr. just a sec.

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p if !p.isInstanceOf[LeafNode] =>
val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable })
p transformExpressions {
case ar: AttributeReference if nullabilityMap.contains(ar) =>
ar.withNullability(nullabilityMap(ar))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor


class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Constant Folding", FixedPoint(10),
NullPropagation,
ConstantFolding,
BooleanSimplification,
SimplifyConditionals,
SimplifyBinaryComparison,
SimplifyExtractValueOps) ::
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences) :: Nil
}

test("update nullability in AttributeReference") {
val rel = LocalRelation('a.long.notNull)
// In the 'original' plans below, the Aggregate node produced by groupBy() has a
// nullable AttributeReference to `b`, because both array indexing and map lookup are
// nullable expressions. After optimization, the same attribute is now non-nullable,
// but the AttributeReference is not updated to reflect this. So, we need to update nullability
// by the `UpdateNullabilityInAttributeReferences` rule.
val original = rel
.select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
.groupBy($"b")("1")
val expected = rel.select('a as "b").groupBy($"b")("1").analyze
val optimized = Optimizer.execute(original.analyze)
comparePlans(optimized, expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,6 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper {
.groupBy($"foo")("1")
checkRule(structRel, structExpected)

// These tests must use nullable attributes from the base relation for the following reason:
// in the 'original' plans below, the Aggregate node produced by groupBy() has a
// nullable AttributeReference to a1, because both array indexing and map lookup are
// nullable expressions. After optimization, the same attribute is now non-nullable,
// but the AttributeReference is not updated to reflect this. In the 'expected' plans,
// the grouping expressions have the same nullability as the original attribute in the
// relation. If that attribute is non-nullable, the tests will fail as the plans will
// compare differently, so for these tests we must use a nullable attribute. See
// SPARK-23634.
val arrayRel = relation
.select(GetArrayItem(CreateArray(Seq('nullable_id, 'nullable_id + 1L)), 0) as "a1")
.groupBy($"a1")("1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2055,11 +2055,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
expr: String,
expectedNonNullableColumns: Seq[String]): Unit = {
val dfWithFilter = df.where(s"isnotnull($expr)").selectExpr(expr)
// In the logical plan, all the output columns of input dataframe are nullable
dfWithFilter.queryExecution.optimizedPlan.collect {
case e: Filter => assert(e.output.forall(_.nullable))
}

dfWithFilter.queryExecution.executedPlan.collect {
// When the child expression in isnotnull is null-intolerant (i.e. any null input will
// result in null output), the involved columns are converted to not nullable;
Expand Down