-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32590][SQL] Remove fullOutput from RowDataSourceScanExec #29415
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,17 +99,14 @@ trait DataSourceScanExec extends LeafExecNode { | |
|
||
/** Physical plan node for scanning data from a relation. */ | ||
case class RowDataSourceScanExec( | ||
fullOutput: Seq[Attribute], | ||
requiredColumnsIndex: Seq[Int], | ||
output: Seq[Attribute], | ||
filters: Set[Filter], | ||
handledFilters: Set[Filter], | ||
rdd: RDD[InternalRow], | ||
@transient relation: BaseRelation, | ||
tableIdentifier: Option[TableIdentifier]) | ||
extends DataSourceScanExec with InputRDDCodegen { | ||
|
||
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) | ||
|
||
override lazy val metrics = | ||
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) | ||
|
||
|
@@ -143,7 +140,6 @@ case class RowDataSourceScanExec( | |
// Don't care about `rdd` and `tableIdentifier` when canonicalizing. | ||
override def doCanonicalize(): SparkPlan = | ||
copy( | ||
fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't we need to normalize There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may need to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I didn't know that we need to use the normalized exprId in canonicalized plan. If we do, then probably we can't remove fullOutput from RowDataSourceScanExec, because using the normalized pruned output would cause problem. For example, in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68, normalized the pruned output will give
Then in df1.union(df2), it takes
The union result will be
instead of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea that's why I propose to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan I added |
||
rdd = null, | ||
tableIdentifier = None) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you find out the PR that added it? I can't quite remember why we have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was introduced in #18600 for plan equality comparison.
I manually print out the two canonicalized plans for
df1
anddf2
in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68 to check my change.Before my change:
*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: structnone:int,none:int
*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#2] PushedFilters: [], ReadSchema: structnone:int,none:int
After my change :
*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>
*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,C#2] PushedFilters: [], ReadSchema: struct<A:int,C:int>