Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters #23770

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
}

object DataSourceStrategy {
/**
* The attribute name of predicate could be different than the one in schema in case of
* case insensitive, we should change them to match the one in schema, so we do not need to
* worry about case sensitivity anymore.
*/
protected[sql] def normalizeFilters(
filters: Seq[Expression],
attributes: Seq[AttributeReference]): Seq[Expression] = {
filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(attributes.find(_.semanticEquals(a)).get.name)
}
}
}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output)

val partitionColumns =
l.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable

import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -104,10 +104,13 @@ object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()

val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output)

// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters)
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext {
IsNotNull(attrInt))), None)
}

test("SPARK-26865 DataSourceV2Strategy should push normalized filters") {
val attrInt = 'cint.int
DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the result of normalizeFilters? It seems that the test case will always pass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Right. Thanks!

}

/**
* Translate the given Catalyst [[Expression]] into data source [[sources.Filter]]
* then verify against the given [[sources.Filter]].
Expand Down