diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index bef9f4b46c628..56125ec071121 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -99,8 +99,8 @@ trait DataSourceScanExec extends LeafExecNode { /** Physical plan node for scanning data from a relation. */ case class RowDataSourceScanExec( - fullOutput: Seq[Attribute], - requiredColumnsIndex: Seq[Int], + output: Seq[Attribute], + requiredSchema: StructType, filters: Set[Filter], handledFilters: Set[Filter], rdd: RDD[InternalRow], @@ -108,8 +108,6 @@ case class RowDataSourceScanExec( tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with InputRDDCodegen { - def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) - override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -136,14 +134,14 @@ case class RowDataSourceScanExec( if (handledFilters.contains(filter)) s"*$filter" else s"$filter" } Map( - "ReadSchema" -> output.toStructType.catalogString, + "ReadSchema" -> requiredSchema.catalogString, "PushedFilters" -> markedFilters.mkString("[", ", ", "]")) } // Don't care about `rdd` and `tableIdentifier` when canonicalizing. override def doCanonicalize(): SparkPlan = copy( - fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)), + output.map(QueryPlan.normalizeExpressions(_, output)), rdd = null, tableIdentifier = None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index ada04c2382b72..3ccff6d89babd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -307,7 +307,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with case l @ LogicalRelation(baseRelation: TableScan, _, _, _) => RowDataSourceScanExec( l.output, - l.output.indices, + l.output.toStructType, Set.empty, Set.empty, toCatalystRDD(l, baseRelation.buildScan()), @@ -379,8 +379,8 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with .map(relation.attributeMap) val scan = RowDataSourceScanExec( - relation.output, - requestedColumns.map(relation.output.indexOf), + requestedColumns, + requestedColumns.toStructType, pushedFilters.toSet, handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), @@ -401,8 +401,8 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq val scan = RowDataSourceScanExec( - relation.output, - requestedColumns.map(relation.output.indexOf), + requestedColumns, + requestedColumns.toStructType, pushedFilters.toSet, handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f289a867e5ec0..fe4f8bc83fcff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -64,11 +64,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } val rdd = v1Relation.buildScan() val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) - val originalOutputNames = relation.table.schema().map(_.name) - val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) val dsScan = RowDataSourceScanExec( output, - requiredColumnsIndex, + output.toStructType, translated.toSet, pushed.toSet, unsafeRowRDD,