Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjoon-hyun committed Feb 13, 2019
1 parent 2b5a086 commit 3169bac
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
}

object DataSourceStrategy {
/**
* The attribute name of predicate could be different than the one in schema in case of
* case insensitive, we should change them to match the one in schema, so we do not need to
* worry about case sensitivity anymore.
*/
protected[sql] def normalizeFilters(
filters: Seq[Expression],
attributes: Seq[AttributeReference]): Seq[Expression] = {
filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(attributes.find(_.semanticEquals(a)).get.name)
}
}
}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output)

val partitionColumns =
l.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,8 @@ object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
e transform {
case a: AttributeReference =>
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
}
}

val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output)

// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2967,11 +2967,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-26865 ORC filter pushdown should be case insensitive by default") {
spark.range(10).write.mode("overwrite").orc("/tmp/o1")
spark.read.schema("ID long").orc("/tmp/o1").filter("id > 5").show
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext {
IsNotNull(attrInt))), None)
}

test("SPARK-26865 DataSourceV2Strategy should push normalized filters") {
val attrInt = 'cint.int
DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt))
}

/**
* Translate the given Catalyst [[Expression]] into data source [[sources.Filter]]
* then verify against the given [[sources.Filter]].
Expand Down

0 comments on commit 3169bac

Please sign in to comment.