From 3169baccc1bf40c64f1650c446614a671d6aec72 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 12 Feb 2019 22:40:30 -0800 Subject: [PATCH] refactor --- .../datasources/DataSourceStrategy.scala | 16 ++++++++++++++++ .../datasources/FileSourceStrategy.scala | 10 +--------- .../datasources/v2/DataSourceV2Strategy.scala | 11 ++--------- .../org/apache/spark/sql/SQLQuerySuite.scala | 5 ----- .../datasources/DataSourceStrategySuite.scala | 5 +++++ 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b5cf8c9515bfb..273cc3b19302d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -426,6 +426,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with } object DataSourceStrategy { + /** + * The attribute name of predicate could be different than the one in schema in case of + * case insensitive, we should change them to match the one in schema, so we do not need to + * worry about case sensitivity anymore. + */ + protected[sql] def normalizeFilters( + filters: Seq[Expression], + attributes: Seq[AttributeReference]): Seq[Expression] = { + filters.filterNot(SubqueryExpression.hasSubquery).map { e => + e transform { + case a: AttributeReference => + a.withName(attributes.find(_.semanticEquals(a)).get.name) + } + } + } + /** * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 62ab5c80d47cf..970cbda6355e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging { // - filters that need to be evaluated again after the scan val filterSet = ExpressionSet(filters) - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e => - e transform { - case a: AttributeReference => - a.withName(l.output.find(_.semanticEquals(a)).get.name) - } - } + val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output) val partitionColumns = l.resolve( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index cd27123fe5ec4..d6d17d6df7b1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -104,15 +104,8 @@ object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => val scanBuilder = relation.newScanBuilder() - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e => - e transform { - case a: AttributeReference => - a.withName(relation.output.find(_.semanticEquals(a)).get.name) - } - } + + val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ed35c20aa9395..b8c4d73f1b2b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2967,11 +2967,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-26865 ORC filter pushdown should be case insensitive by default") { - spark.range(10).write.mode("overwrite").orc("/tmp/o1") - spark.read.schema("ID long").orc("/tmp/o1").filter("id > 5").show - } - test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index f20aded169e44..bc4abf5f357c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -219,6 +219,11 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext { IsNotNull(attrInt))), None) } + test("SPARK-26865 DataSourceV2Strategy should push normalized filters") { + val attrInt = 'cint.int + DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt)) + } + /** * Translate the given Catalyst [[Expression]] into data source [[sources.Filter]] * then verify against the given [[sources.Filter]].