Skip to content

Commit

Permalink
[SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjoon-hyun committed Feb 13, 2019
1 parent 8126d09 commit 2b5a086
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable

import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -104,10 +104,20 @@ object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
}
}

// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters)
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2967,6 +2967,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-26865 ORC filter pushdown should be case insensitive by default") {
spark.range(10).write.mode("overwrite").orc("/tmp/o1")
spark.read.schema("ID long").orc("/tmp/o1").filter("id > 5").show
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
Expand Down

0 comments on commit 2b5a086

Please sign in to comment.