From 3a3bde00428903bf04af713e0d76e500c3e0e838 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Sep 2019 00:37:46 -0700 Subject: [PATCH 1/2] Subquery should not cause NPE when eliminating subexpression. --- .../expressions/EquivalentExpressions.scala | 5 ++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 72ff9361d8f75..5803448df2489 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -72,7 +72,10 @@ class EquivalentExpressions { val skip = expr.isInstanceOf[LeafExpression] || // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. - expr.find(_.isInstanceOf[LambdaVariable]).isDefined + expr.find(_.isInstanceOf[LambdaVariable]).isDefined || + // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, + // can cause unexpected error. + expr.isInstanceOf[PlanExpression[_]] // There are some special expressions that we should not recurse into all of its children. // 1. CodegenFallback: it's children will not be used to generate code (call eval() instead) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 28a027690db04..6f8733a2fbd30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -3149,6 +3150,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(sql("select * from t1 where d > '1999-13'"), Row(result)) checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true)) } + sql("DROP VIEW t1") } test("SPARK-28156: self-join should not miss cached view") { @@ -3192,6 +3194,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100")))) } } + + test("SPARK-29239: Subquery should not cause NPE when eliminating subexpression") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false", + SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY", + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) { + withTempView("t1", "t2") { + sql("create temporary view t1 as select * from values ('val1a', 10L) as t1(t1a, t1b)") + sql("create temporary view t2 as select * from values ('val3a', 110L) as t2(t2a, t2b)") + val df = sql("SELECT min, min from (SELECT (SELECT min(t2b) FROM t2) min " + + "FROM t1 WHERE t1a = 'val1c')") + assert(df.collect().size == 0) + } + } + } } case class Foo(bar: Option[String]) From 75109a01011e313ede4ceedc92139ef10f972e53 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Sep 2019 08:33:55 -0700 Subject: [PATCH 2/2] Only skip PlanExpression on executor. --- .../sql/catalyst/expressions/EquivalentExpressions.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 5803448df2489..a32052ce121df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import scala.collection.mutable +import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable @@ -74,8 +75,8 @@ class EquivalentExpressions { // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. expr.find(_.isInstanceOf[LambdaVariable]).isDefined || // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, - // can cause unexpected error. - expr.isInstanceOf[PlanExpression[_]] + // can cause error like NPE. + (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null) // There are some special expressions that we should not recurse into all of its children. // 1. CodegenFallback: it's children will not be used to generate code (call eval() instead)