Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29239][SPARK-29221][SQL] Subquery should not cause NPE when eliminating subexpression #25925

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import scala.collection.mutable

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable

Expand Down Expand Up @@ -72,7 +73,10 @@ class EquivalentExpressions {
val skip = expr.isInstanceOf[LeafExpression] ||
// `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the
// loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning.
expr.find(_.isInstanceOf[LambdaVariable]).isDefined
expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
// `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
// can cause error like NPE.
(expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for curiosity, does this issue happen in interpreted code path as well? e.g. we send PlanExpression to executor side and eval it, and hit NPE.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC,EquivalentExpressions is only used in the codegen mode now, e.g., GenerateUnsafeProjection uses this class in common subexpr elimination, but `InterpretedUnsafeProject does not elimnate common subexprs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand your question correctly. But PlanExpressions of a SparkPlan are evaluated and updated (e.g., ExecSubqueryExpression.updateResult) with values before a query begins to run. The values are kept in PlanExpression, and on executor side when to call eval of PlanExpression, it simply returns the kept value. I think we do not really evaluate a PlanExpression at executor side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it, so the kept value is serialized and sent to executor side in interpreted code path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue also reminds me that it's better to always do codegen at driver side, even if whole-stage-codegen is false. We can investigate it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Please let me know if you have some ideas later.


// There are some special expressions that we should not recurse into all of its children.
// 1. CodegenFallback: it's children will not be used to generate code (call eval() instead)
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
Expand Down Expand Up @@ -3149,6 +3150,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
checkAnswer(sql("select * from t1 where d > '1999-13'"), Row(result))
checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true))
}
sql("DROP VIEW t1")
}

test("SPARK-28156: self-join should not miss cached view") {
Expand Down Expand Up @@ -3192,6 +3194,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100"))))
}
}

test("SPARK-29239: Subquery should not cause NPE when eliminating subexpression") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY",
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) {
withTempView("t1", "t2") {
sql("create temporary view t1 as select * from values ('val1a', 10L) as t1(t1a, t1b)")
sql("create temporary view t2 as select * from values ('val3a', 110L) as t2(t2a, t2b)")
val df = sql("SELECT min, min from (SELECT (SELECT min(t2b) FROM t2) min " +
"FROM t1 WHERE t1a = 'val1c')")
assert(df.collect().size == 0)
}
}
}
}

case class Foo(bar: Option[String])