diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2aa0f2117364c..a84bb7653c527 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -197,8 +197,8 @@ class Analyzer( PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), - Batch("FixNullability", Once, - FixNullability), + Batch("UpdateNullability", Once, + UpdateAttributeNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, @@ -1821,40 +1821,6 @@ class Analyzer( } } - /** - * Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of - * corresponding Attributes of its children output Attributes. This step is needed because - * users can use a resolved AttributeReference in the Dataset API and outer joins - * can change the nullability of an AttribtueReference. Without the fix, a nullable column's - * nullable field can be actually set as non-nullable, which cause illegal optimization - * (e.g., NULL propagation) and wrong answers. - * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. - */ - object FixNullability extends Rule[LogicalPlan] { - - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case p if !p.resolved => p // Skip unresolved nodes. - case p: LogicalPlan if p.resolved => - val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { - case (exprId, attributes) => - // If there are multiple Attributes having the same ExprId, we need to resolve - // the conflict of nullable field. We do not really expect this happen. - val nullable = attributes.exists(_.nullable) - attributes.map(attr => attr.withNullability(nullable)) - }.toSeq - // At here, we create an AttributeMap that only compare the exprId for the lookup - // operation. So, we can find the corresponding input attribute's nullability. - val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) - // For an Attribute used by the current LogicalPlan, if it is from its children, - // we fix the nullable field by using the nullability setting of the corresponding - // output Attribute from the children. - p.transformExpressions { - case attr: Attribute if attributeMap.contains(attr) => - attr.withNullability(attributeMap(attr).nullable) - } - } - } - /** * Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and * aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala new file mode 100644 index 0000000000000..8655decdcf278 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of + * corresponding Attributes of its children output Attributes. This step is needed because + * users can use a resolved AttributeReference in the Dataset API and outer joins + * can change the nullability of an AttribtueReference. Without this rule, a nullable column's + * nullable field can be actually set as non-nullable, which cause illegal optimization + * (e.g., NULL propagation) and wrong answers. + * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. + * + * This rule should be executed again at the end of optimization phase, as optimizer may change + * some expressions and their nullabilities as well. See SPARK-21351 for more details. + */ +object UpdateAttributeNullability extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + // Skip unresolved nodes. + case p if !p.resolved => p + // Skip leaf node, as it has no child and no need to update nullability. + case p: LeafNode => p + case p: LogicalPlan => + val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map { + // If there are multiple Attributes having the same ExprId, we need to resolve + // the conflict of nullable field. We do not really expect this to happen. + case (exprId, attributes) => exprId -> attributes.exists(_.nullable) + } + // For an Attribute used by the current LogicalPlan, if it is from its children, + // we fix the nullable field by using the nullability setting of the corresponding + // output Attribute from the children. + p.transformExpressions { + case attr: Attribute if nullabilities.contains(attr.exprId) => + attr.withNullability(nullabilities(attr.exprId)) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d51dc6663d434..d92f7f860b1b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ColumnPruning, CollapseProject, RemoveNoopOperators) :+ - Batch("UpdateAttributeReferences", Once, - UpdateNullabilityInAttributeReferences) :+ + Batch("UpdateNullability", Once, UpdateAttributeNullability) :+ // This batch must be executed after the `RewriteSubquery` batch, which creates joins. Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) } @@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { } } } - -/** - * Updates nullability in [[AttributeReference]]s if nullability is different between - * non-leaf plan's expressions and the children output. - */ -object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case p if !p.isInstanceOf[LeafNode] => - val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable }) - p transformExpressions { - case ar: AttributeReference if nullabilityMap.contains(ar) => - ar.withNullability(nullabilityMap(ar)) - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala similarity index 89% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala index 09b11f5aba2a0..6d6f799b830f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem} @@ -25,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -class UpdateNullabilityInAttributeReferencesSuite extends PlanTest { +class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest { object Optimizer extends RuleExecutor[LogicalPlan] { val batches = @@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest { SimplifyConditionals, SimplifyBinaryComparison, SimplifyExtractValueOps) :: - Batch("UpdateAttributeReferences", Once, - UpdateNullabilityInAttributeReferences) :: Nil + Batch("UpdateNullability", Once, + UpdateAttributeNullability) :: Nil } test("update nullability in AttributeReference") { @@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest { // nullable AttributeReference to `b`, because both array indexing and map lookup are // nullable expressions. After optimization, the same attribute is now non-nullable, // but the AttributeReference is not updated to reflect this. So, we need to update nullability - // by the `UpdateNullabilityInAttributeReferences` rule. + // by the `UpdateAttributeNullability` rule. val original = rel .select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b") .groupBy($"b")("1")