Skip to content

Commit

Permalink
[SPARK-26459][SQL] replace UpdateNullabilityInAttributeReferences wit…
Browse files Browse the repository at this point in the history
…h FixNullability

## What changes were proposed in this pull request?

This is a followup of apache#18576

The newly added rule `UpdateNullabilityInAttributeReferences` does the same thing the `FixNullability` does, we only need to keep one of them.

This PR removes `UpdateNullabilityInAttributeReferences`, and use `FixNullability` to replace it. Also rename it to `UpdateAttributeNullability`

## How was this patch tested?

existing tests

Closes apache#23390 from cloud-fan/nullable.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
  • Loading branch information
cloud-fan authored and jackylee-ch committed Feb 18, 2019
1 parent 6129fb5 commit 82285b5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ class Analyzer(
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("UpdateNullability", Once,
UpdateAttributeNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
Expand Down Expand Up @@ -1821,40 +1821,6 @@ class Analyzer(
}
}

/**
* Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of
* corresponding Attributes of its children output Attributes. This step is needed because
* users can use a resolved AttributeReference in the Dataset API and outer joins
* can change the nullability of an AttribtueReference. Without the fix, a nullable column's
* nullable field can be actually set as non-nullable, which cause illegal optimization
* (e.g., NULL propagation) and wrong answers.
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
*/
object FixNullability extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case p if !p.resolved => p // Skip unresolved nodes.
case p: LogicalPlan if p.resolved =>
val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap {
case (exprId, attributes) =>
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this happen.
val nullable = attributes.exists(_.nullable)
attributes.map(attr => attr.withNullability(nullable))
}.toSeq
// At here, we create an AttributeMap that only compare the exprId for the lookup
// operation. So, we can find the corresponding input attribute's nullability.
val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr))
// For an Attribute used by the current LogicalPlan, if it is from its children,
// we fix the nullable field by using the nullability setting of the corresponding
// output Attribute from the children.
p.transformExpressions {
case attr: Attribute if attributeMap.contains(attr) =>
attr.withNullability(attributeMap(attr).nullable)
}
}
}

/**
* Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and
* aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of
* corresponding Attributes of its children output Attributes. This step is needed because
* users can use a resolved AttributeReference in the Dataset API and outer joins
* can change the nullability of an AttribtueReference. Without this rule, a nullable column's
* nullable field can be actually set as non-nullable, which cause illegal optimization
* (e.g., NULL propagation) and wrong answers.
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
*
* This rule should be executed again at the end of optimization phase, as optimizer may change
* some expressions and their nullabilities as well. See SPARK-21351 for more details.
*/
object UpdateAttributeNullability extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
// Skip unresolved nodes.
case p if !p.resolved => p
// Skip leaf node, as it has no child and no need to update nullability.
case p: LeafNode => p
case p: LogicalPlan =>
val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this to happen.
case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
}
// For an Attribute used by the current LogicalPlan, if it is from its children,
// we fix the nullable field by using the nullability setting of the corresponding
// output Attribute from the children.
p.transformExpressions {
case attr: Attribute if nullabilities.contains(attr.exprId) =>
attr.withNullability(nullabilities(attr.exprId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
ColumnPruning,
CollapseProject,
RemoveNoopOperators) :+
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences) :+
Batch("UpdateNullability", Once, UpdateAttributeNullability) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
}
Expand Down Expand Up @@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
}
}
}

/**
* Updates nullability in [[AttributeReference]]s if nullability is different between
* non-leaf plan's expressions and the children output.
*/
object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p if !p.isInstanceOf[LeafNode] =>
val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable })
p transformExpressions {
case ar: AttributeReference if nullabilityMap.contains(ar) =>
ar.withNullability(nullabilityMap(ar))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
Expand All @@ -25,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor


class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest {

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Expand All @@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
SimplifyConditionals,
SimplifyBinaryComparison,
SimplifyExtractValueOps) ::
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences) :: Nil
Batch("UpdateNullability", Once,
UpdateAttributeNullability) :: Nil
}

test("update nullability in AttributeReference") {
Expand All @@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
// nullable AttributeReference to `b`, because both array indexing and map lookup are
// nullable expressions. After optimization, the same attribute is now non-nullable,
// but the AttributeReference is not updated to reflect this. So, we need to update nullability
// by the `UpdateNullabilityInAttributeReferences` rule.
// by the `UpdateAttributeNullability` rule.
val original = rel
.select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
.groupBy($"b")("1")
Expand Down

0 comments on commit 82285b5

Please sign in to comment.