From 101735872e7d181fb3477234278cf602b2d72e33 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 13 Feb 2019 16:04:27 -0800 Subject: [PATCH] [SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters ## What changes were proposed in this pull request? This PR aims to make `DataSourceV2Strategy` normalize filters like [FileSourceStrategy](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L150-L158) when it pushes them into `SupportsPushDownFilters.pushFilters`. ## How was this patch tested? Pass the Jenkins with the newly added test case. Closes #23770 from dongjoon-hyun/SPARK-26865. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../datasources/DataSourceStrategy.scala | 16 ++++++++++++++++ .../datasources/FileSourceStrategy.scala | 10 +--------- .../datasources/v2/DataSourceV2Strategy.scala | 7 +++++-- .../datasources/DataSourceStrategySuite.scala | 7 +++++++ 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b5cf8c9515bfb..273cc3b19302d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -426,6 +426,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with } object DataSourceStrategy { + /** + * The attribute name of predicate could be different than the one in schema in case of + * case insensitive, we should change them to match the one in schema, so we do not need to + * worry about case sensitivity anymore. + */ + protected[sql] def normalizeFilters( + filters: Seq[Expression], + attributes: Seq[AttributeReference]): Seq[Expression] = { + filters.filterNot(SubqueryExpression.hasSubquery).map { e => + e transform { + case a: AttributeReference => + a.withName(attributes.find(_.semanticEquals(a)).get.name) + } + } + } + /** * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 62ab5c80d47cf..970cbda6355e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging { // - filters that need to be evaluated again after the scan val filterSet = ExpressionSet(filters) - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e => - e transform { - case a: AttributeReference => - a.withName(l.output.find(_.semanticEquals(a)).get.name) - } - } + val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output) val partitionColumns = l.resolve( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 40ac5cf402987..d6d17d6df7b1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} @@ -104,10 +104,13 @@ object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => val scanBuilder = relation.newScanBuilder() + + val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output) + // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters) + val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters) val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) logInfo( s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index f20aded169e44..2f5d5551c5df0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -219,6 +219,13 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext { IsNotNull(attrInt))), None) } + test("SPARK-26865 DataSourceV2Strategy should push normalized filters") { + val attrInt = 'cint.int + assertResult(Seq(IsNotNull(attrInt))) { + DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt)) + } + } + /** * Translate the given Catalyst [[Expression]] into data source [[sources.Filter]] * then verify against the given [[sources.Filter]].