From 9558823930d01814e98d50925c700bed73240815 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 12 Aug 2020 00:13:37 -0700 Subject: [PATCH 1/2] [SPARK-32590][SQL] Remove fullOutput from RowDataSourceScanExec --- .../apache/spark/sql/execution/DataSourceScanExec.scala | 6 +----- .../sql/execution/datasources/DataSourceStrategy.scala | 7 ++----- .../execution/datasources/v2/DataSourceV2Strategy.scala | 3 --- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index bef9f4b46c628..ad919368750fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -99,8 +99,7 @@ trait DataSourceScanExec extends LeafExecNode { /** Physical plan node for scanning data from a relation. */ case class RowDataSourceScanExec( - fullOutput: Seq[Attribute], - requiredColumnsIndex: Seq[Int], + output: Seq[Attribute], filters: Set[Filter], handledFilters: Set[Filter], rdd: RDD[InternalRow], @@ -108,8 +107,6 @@ case class RowDataSourceScanExec( tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with InputRDDCodegen { - def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) - override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -143,7 +140,6 @@ case class RowDataSourceScanExec( // Don't care about `rdd` and `tableIdentifier` when canonicalizing. override def doCanonicalize(): SparkPlan = copy( - fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)), rdd = null, tableIdentifier = None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index ada04c2382b72..252ec85440ef1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -307,7 +307,6 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with case l @ LogicalRelation(baseRelation: TableScan, _, _, _) => RowDataSourceScanExec( l.output, - l.output.indices, Set.empty, Set.empty, toCatalystRDD(l, baseRelation.buildScan()), @@ -379,8 +378,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with .map(relation.attributeMap) val scan = RowDataSourceScanExec( - relation.output, - requestedColumns.map(relation.output.indexOf), + requestedColumns, pushedFilters.toSet, handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), @@ -401,8 +399,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq val scan = RowDataSourceScanExec( - relation.output, - requestedColumns.map(relation.output.indexOf), + requestedColumns, pushedFilters.toSet, handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f289a867e5ec0..8269048463860 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -64,11 +64,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } val rdd = v1Relation.buildScan() val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) - val originalOutputNames = relation.table.schema().map(_.name) - val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) val dsScan = RowDataSourceScanExec( output, - requiredColumnsIndex, translated.toSet, pushed.toSet, unsafeRowRDD, From bd58665201f0af06cd8e14855c09b756695e435e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 13 Aug 2020 09:20:28 -0700 Subject: [PATCH 2/2] add requiredSchema --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 4 +++- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 3 +++ .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ad919368750fe..56125ec071121 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -100,6 +100,7 @@ trait DataSourceScanExec extends LeafExecNode { /** Physical plan node for scanning data from a relation. */ case class RowDataSourceScanExec( output: Seq[Attribute], + requiredSchema: StructType, filters: Set[Filter], handledFilters: Set[Filter], rdd: RDD[InternalRow], @@ -133,13 +134,14 @@ case class RowDataSourceScanExec( if (handledFilters.contains(filter)) s"*$filter" else s"$filter" } Map( - "ReadSchema" -> output.toStructType.catalogString, + "ReadSchema" -> requiredSchema.catalogString, "PushedFilters" -> markedFilters.mkString("[", ", ", "]")) } // Don't care about `rdd` and `tableIdentifier` when canonicalizing. override def doCanonicalize(): SparkPlan = copy( + output.map(QueryPlan.normalizeExpressions(_, output)), rdd = null, tableIdentifier = None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 252ec85440ef1..3ccff6d89babd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -307,6 +307,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with case l @ LogicalRelation(baseRelation: TableScan, _, _, _) => RowDataSourceScanExec( l.output, + l.output.toStructType, Set.empty, Set.empty, toCatalystRDD(l, baseRelation.buildScan()), @@ -379,6 +380,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with val scan = RowDataSourceScanExec( requestedColumns, + requestedColumns.toStructType, pushedFilters.toSet, handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), @@ -400,6 +402,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with val scan = RowDataSourceScanExec( requestedColumns, + requestedColumns.toStructType, pushedFilters.toSet, handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 8269048463860..fe4f8bc83fcff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -66,6 +66,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) val dsScan = RowDataSourceScanExec( output, + output.toStructType, translated.toSet, pushed.toSet, unsafeRowRDD,