Skip to content

Commit

Permalink
[SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR aims to make `DataSourceV2Strategy` normalize filters like [FileSourceStrategy](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L150-L158) when it pushes them into `SupportsPushDownFilters.pushFilters`.

## How was this patch tested?

Pass the Jenkins with the newly added test case.

Closes #23770 from dongjoon-hyun/SPARK-26865.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Feb 14, 2019
1 parent a829234 commit 7a8ff15
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
}

object DataSourceStrategy {
/**
* The attribute name of predicate could be different than the one in schema in case of
* case insensitive, we should change them to match the one in schema, so we do not need to
* worry about case sensitivity anymore.
*/
protected[sql] def normalizeFilters(
filters: Seq[Expression],
attributes: Seq[AttributeReference]): Seq[Expression] = {
filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(attributes.find(_.semanticEquals(a)).get.name)
}
}
}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output)

val partitionColumns =
l.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable

import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -104,10 +104,13 @@ object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()

val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output)

// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters)
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext {
IsNotNull(attrInt))), None)
}

test("SPARK-26865 DataSourceV2Strategy should push normalized filters") {
val attrInt = 'cint.int
assertResult(Seq(IsNotNull(attrInt))) {
DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt))
}
}

/**
* Translate the given Catalyst [[Expression]] into data source [[sources.Filter]]
* then verify against the given [[sources.Filter]].
Expand Down

0 comments on commit 7a8ff15

Please sign in to comment.