From 70da86a085b61a0981c3f9fc6dbd897716472642 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 19 Dec 2020 14:10:20 -0800 Subject: [PATCH] [SPARK-33850][SQL] EXPLAIN FORMATTED doesn't show the plan for subqueries if AQE is enabled ### What changes were proposed in this pull request? This PR fixes an issue that when AQE is enabled, EXPLAIN FORMATTED doesn't show the plan for subqueries. ```scala val df = spark.range(1, 100) df.createTempView("df") spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("FORMATTED") == Physical Plan == AdaptiveSparkPlan (3) +- Project (2) +- Scan OneRowRelation (1) (1) Scan OneRowRelation Output: [] Arguments: ParallelCollectionRDD[0] at explain at :24, OneRowRelation, UnknownPartitioning(0) (2) Project Output [1]: [Subquery subquery#3, [id=#20] AS scalarsubquery()#5L] Input: [] (3) AdaptiveSparkPlan Output [1]: [scalarsubquery()#5L] Arguments: isFinalPlan=false ``` After this change, the plan for the subquerie is shown. ```scala == Physical Plan == * Project (2) +- * Scan OneRowRelation (1) (1) Scan OneRowRelation [codegen id : 1] Output: [] Arguments: ParallelCollectionRDD[0] at explain at :24, OneRowRelation, UnknownPartitioning(0) (2) Project [codegen id : 1] Output [1]: [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L] Input: [] ===== Subqueries ===== Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#3, [id=#24] * HashAggregate (6) +- Exchange (5) +- * HashAggregate (4) +- * Range (3) (3) Range [codegen id : 1] Output [1]: [id#0L] Arguments: Range (1, 100, step=1, splits=Some(12)) (4) HashAggregate [codegen id : 1] Input [1]: [id#0L] Keys: [] Functions [1]: [partial_min(id#0L)] Aggregate Attributes [1]: [min#7L] Results [1]: [min#8L] (5) Exchange Input [1]: [min#8L] Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#20] (6) HashAggregate [codegen id : 2] Input [1]: [min#8L] Keys: [] Functions [1]: [min(id#0L)] Aggregate Attributes [1]: [min(id#0L)#4L] Results [1]: [min(id#0L)#4L AS v#2L] ``` ### Why are the changes needed? For better debuggability. ### Does this PR introduce _any_ user-facing change? Yes. Users can see the formatted plan for subqueries. ### How was this patch tested? New test. Closes #30855 from sarutak/fix-aqe-explain. Authored-by: Kousuke Saruta Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/ExplainUtils.scala | 2 + .../sql-tests/results/explain-aqe.sql.out | 263 ++++++++++++++++++ .../org/apache/spark/sql/ExplainSuite.scala | 22 ++ 3 files changed, 287 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 20e6fb6f96eaa..f47542ca59bc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -218,6 +218,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { plan: => QueryPlan[_], subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { plan.foreach { + case a: AdaptiveSparkPlanExec => + getSubqueries(a.executedPlan, subqueries) case p: SparkPlan => p.expressions.foreach (_.collect { case e: PlanExpression[_] => diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 578b0a807fc52..d68989524d486 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -407,6 +407,101 @@ Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subq Output [2]: [key#x, val#x] Arguments: isFinalPlan=false +===== Subqueries ===== + +Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (10) ++- HashAggregate (9) + +- Exchange (8) + +- HashAggregate (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp2 (4) + + +(4) Scan parquet default.explain_temp2 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp2] +PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x = 2)) + +(6) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(7) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] + +(8) Exchange +Input [1]: [max#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(9) HashAggregate +Input [1]: [max#x] +Keys: [] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] + +(10) AdaptiveSparkPlan +Output [1]: [max(key)#x] +Arguments: isFinalPlan=false + +Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet default.explain_temp3 (11) + + +(11) Scan parquet default.explain_temp3 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp3] +PushedFilters: [IsNotNull(val), GreaterThan(val,0)] +ReadSchema: struct + +(12) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(13) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(14) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] + +(15) Exchange +Input [1]: [max#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(16) HashAggregate +Input [1]: [max#x] +Keys: [] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] + +(17) AdaptiveSparkPlan +Output [1]: [max(key)#x] +Arguments: isFinalPlan=false -- !query EXPLAIN FORMATTED @@ -442,6 +537,101 @@ Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) = Output [2]: [key#x, val#x] Arguments: isFinalPlan=false +===== Subqueries ===== + +Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (10) ++- HashAggregate (9) + +- Exchange (8) + +- HashAggregate (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp2 (4) + + +(4) Scan parquet default.explain_temp2 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp2] +PushedFilters: [IsNotNull(val), GreaterThan(val,0)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(6) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(7) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] + +(8) Exchange +Input [1]: [max#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(9) HashAggregate +Input [1]: [max#x] +Keys: [] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] + +(10) AdaptiveSparkPlan +Output [1]: [max(key)#x] +Arguments: isFinalPlan=false + +Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet default.explain_temp3 (11) + + +(11) Scan parquet default.explain_temp3 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp3] +PushedFilters: [IsNotNull(val), GreaterThan(val,0)] +ReadSchema: struct + +(12) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(13) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(14) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] + +(15) Exchange +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(16) HashAggregate +Input [2]: [sum#x, count#xL] +Keys: [] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] + +(17) AdaptiveSparkPlan +Output [1]: [avg(key)#x] +Arguments: isFinalPlan=false -- !query EXPLAIN FORMATTED @@ -470,6 +660,79 @@ Input: [] Output [1]: [(scalarsubquery() + scalarsubquery())#x] Arguments: isFinalPlan=false +===== Subqueries ===== + +Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (8) ++- HashAggregate (7) + +- Exchange (6) + +- HashAggregate (5) + +- Scan parquet default.explain_temp1 (4) + + +(4) Scan parquet default.explain_temp1 +Output [1]: [key#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(5) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] + +(6) Exchange +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(7) HashAggregate +Input [2]: [sum#x, count#xL] +Keys: [] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] + +(8) AdaptiveSparkPlan +Output [1]: [avg(key)#x] +Arguments: isFinalPlan=false + +Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (13) ++- HashAggregate (12) + +- Exchange (11) + +- HashAggregate (10) + +- Scan parquet default.explain_temp1 (9) + + +(9) Scan parquet default.explain_temp1 +Output [1]: [key#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(10) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] + +(11) Exchange +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(12) HashAggregate +Input [2]: [sum#x, count#xL] +Keys: [] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] + +(13) AdaptiveSparkPlan +Output [1]: [avg(key)#x] +Arguments: isFinalPlan=false -- !query EXPLAIN FORMATTED diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 75372c5437f25..0ec57c2fcb5ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -277,6 +277,28 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } + test("SPARK-33850: explain formatted - check presence of subquery in case of AQE") { + withTable("df1") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withTable("df1") { + spark.range(1, 100) + .write + .format("parquet") + .mode("overwrite") + .saveAsTable("df1") + + val sqlText = "EXPLAIN FORMATTED SELECT (SELECT min(id) FROM df1) as v" + val expected_pattern1 = + "Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x" + + withNormalizedExplain(sqlText) { normalizedOutput => + assert(expected_pattern1.r.findAllMatchIn(normalizedOutput).length == 1) + } + } + } + } + } + test("Support ExplainMode in Dataset.explain") { val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1") val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")