Skip to content

Commit

Permalink
[SPARK-13473][SQL] Simplifies PushPredicateThroughProject
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This is a follow-up of PR apache#11348.

After PR apache#11348, a predicate is never pushed through a project as long as the project contains any non-deterministic fields. Thus, it's impossible that the candidate filter condition can reference any non-deterministic projected fields, and related logic can be safely cleaned up.

To be more specific, the following optimization is allowed:

```scala
// From:
df.select('a, 'b).filter('c > rand(42))
// To:
df.filter('c > rand(42)).select('a, 'b)
```

while this isn't:

```scala
// From:
df.select('a, rand('b) as 'rb, 'c).filter('c > 'rb)
// To:
df.filter('c > rand('b)).select('a, rand('b) as 'rb, 'c)
```

## How was this patch tested?

Existing test cases should do the work.

Author: Cheng Lian <[email protected]>

Closes apache#11864 from liancheng/spark-13473-cleanup.
  • Loading branch information
liancheng authored and roygao94 committed Mar 22, 2016
1 parent 6e04d41 commit 0fbdb2e
Showing 1 changed file with 1 addition and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -891,29 +891,7 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
case a: Alias => (a.toAttribute, a.child)
})

// Split the condition into small conditions by `And`, so that we can push down part of this
// condition without nondeterministic expressions.
val andConditions = splitConjunctivePredicates(condition)

val (deterministic, nondeterministic) = andConditions.partition(_.collect {
case a: Attribute if aliasMap.contains(a) => aliasMap(a)
}.forall(_.deterministic))

// If there is no nondeterministic conditions, push down the whole condition.
if (nondeterministic.isEmpty) {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
} else {
// If they are all nondeterministic conditions, leave it un-changed.
if (deterministic.isEmpty) {
filter
} else {
// Push down the small conditions without nondeterministic expressions.
val pushedCondition =
deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
Filter(nondeterministic.reduce(And),
project.copy(child = Filter(pushedCondition, grandChild)))
}
}
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
}

}
Expand Down

0 comments on commit 0fbdb2e

Please sign in to comment.