From ee72cc49c3b1dac8e3881657b859702643bb9ebb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 1 Nov 2023 18:02:29 -0400 Subject: [PATCH 1/4] Minor: Improve documentation for Fulter Pushdown --- datafusion/optimizer/src/push_down_filter.rs | 101 +++++++++++++++---- 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 8c2eb96a48d8..114333e50372 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -12,7 +12,8 @@ // specific language governing permissions and limitations // under the License. -//! Push Down Filter optimizer rule ensures that filters are applied as early as possible in the plan +//! [`PushDownFilter`] Moves filters so they are applied as early as possible in +//! the plan. use crate::optimizer::ApplyOrder; use crate::utils::{conjunction, split_conjunction, split_conjunction_owned}; @@ -33,31 +34,93 @@ use itertools::Itertools; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -/// Push Down Filter optimizer rule pushes filter clauses down the plan +/// Optimizer rule for pushing (moving) filter expressions down in a plan so +/// they are applied as early as possible. +/// /// # Introduction -/// A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)). -/// An example of a filter-commutative operation is a projection; a counter-example is `limit`. /// -/// The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B) -/// can commute with a filter that depends on A only, but does not commute with a filter that depends -/// on SUM(B). +/// The goal of this rule is to improve query performance by eliminating +/// redundant work. +/// +/// For example, given a plan that sorts all values where `a > 10`: +/// +/// ```text +/// Filter (a > 10) +/// Sort (a, b) +/// ``` +/// +/// A better plan is to filter the data *before* the Sort, which sorts fewer +/// rows and therefore does less work overall: +/// +/// ```text +/// Sort (a, b) +/// Filter (a > 10) <-- filter is moved before the sort +/// ``` +/// +/// However it is not always possible to push filters down. For example, given a +/// plan that finds the top 3 values and then keeps only those that are greater +/// than 10, if the filter is pushed below the limit it would produce a +/// different result. +/// +/// ```text +/// Filter (a > 10) <-- can not move this Filter before the limit +/// Limit (fetch=3) +/// Sort (a, b) +/// ``` +/// +/// +/// More formally, a filter-commutative operation is an operation `op` that +/// satisfies `filter(op(data)) = op(filter(data))`. +/// +/// The filter-commutative property is plan and column-specific. A filter on `a` +/// can be pushed through a `Aggregate(group_by = [a], agg=[SUM(b))`. However, a +/// a filter on `SUM(b)` can not be pushed through the same aggregate. +/// +/// # Handling Conjuctions +/// +/// It is possible to only push down **part** of a filter expression if is +/// connected with `AND`s (more formally if it is a "conjuction"). +/// +/// For example, given the following plan: +/// +/// ```text +/// Filter(a > 10 AND SUM(b) < 5) +/// Aggregate(group_by = [a], agg = [SUM(b)) +/// ``` +/// +/// The `a > 10` is commutative with the `Aggregate` but `SUM(b) < 5` is not. +/// Therefoe it is possible to only push part of the expression, resulting in: +/// +/// ```text +/// Filter(SUM(b) < 5) +/// Aggregate(group_by = [a], agg = [SUM(b)) +/// Filter(a > 10) +/// ``` +/// +/// # Handling Column Aliases /// -/// This optimizer commutes filters with filter-commutative operations to push the filters -/// the closest possible to the scans, re-writing the filter expressions by every -/// projection that changes the filter's expression. +/// This optimizer must sometimes handle re-writing filter expressions when they +/// pushed, for example if there is a projection that aliases `a+1` to `"b"`: /// -/// Filter: b Gt Int64(10) -/// Projection: a AS b +/// ```text +/// Filter (b > 10) +/// Projection: [a+1 AS "b"] <-- changes the name of `a+1` to `b` +/// ``` /// -/// is optimized to +/// To apply the filter prior to the `Projection`, all references to `b` must be +/// rewritten to `a+1`: /// -/// Projection: a AS b -/// Filter: a Gt Int64(10) <--- changed from b to a +/// ```text +/// Projection: a AS "b" +/// Filter: (a + 1 > 10) <--- changed from b to a + 1 +/// ``` +/// # Implementation Notes /// -/// This performs a single pass through the plan. When it passes through a filter, it stores that filter, -/// and when it reaches a node that does not commute with it, it adds the filter to that place. -/// When it passes through a projection, it re-writes the filter's expression taking into account that projection. -/// When multiple filters would have been written, it `AND` their expressions into a single expression. +/// This implementation performs a single pass through the plan, "pushing" down +/// filters. When it passes through a filter, it stores that filter, and when it +/// reaches a plan node that does not commute with that filter, it adds the +/// filter to that place. When it passes through a projection, it re-writes the +/// filter's expression taking into account that projection. #[derive(Default)] pub struct PushDownFilter {} From 3d3b83c2531e60bb96ee337c248fa2b9669b06c7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Nov 2023 06:11:51 -0400 Subject: [PATCH 2/4] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: jakevin --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 114333e50372..63a276d9bf9f 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -89,7 +89,7 @@ use std::sync::Arc; /// ``` /// /// The `a > 10` is commutative with the `Aggregate` but `SUM(b) < 5` is not. -/// Therefoe it is possible to only push part of the expression, resulting in: +/// Therefore it is possible to only push part of the expression, resulting in: /// /// ```text /// Filter(SUM(b) < 5) From d88eaaa7592054c0ba75a64f90501912fe25054a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Nov 2023 06:13:13 -0400 Subject: [PATCH 3/4] Apply suggestions from code review --- datafusion/optimizer/src/push_down_filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 63a276d9bf9f..801fff89d80d 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -54,7 +54,7 @@ use std::sync::Arc; /// /// ```text /// Sort (a, b) -/// Filter (a > 10) <-- filter is moved before the sort +/// Filter (a > 10) <-- Filter is moved before the sort /// ``` /// /// However it is not always possible to push filters down. For example, given a @@ -74,7 +74,7 @@ use std::sync::Arc; /// /// The filter-commutative property is plan and column-specific. A filter on `a` /// can be pushed through a `Aggregate(group_by = [a], agg=[SUM(b))`. However, a -/// a filter on `SUM(b)` can not be pushed through the same aggregate. +/// filter on `SUM(b)` can not be pushed through the same aggregate. /// /// # Handling Conjuctions /// From 6188bdbf9cc4e08c92ef3f8185f5e6171c99c2b3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Nov 2023 07:59:28 -0400 Subject: [PATCH 4/4] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Alex Huang --- datafusion/optimizer/src/push_down_filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 801fff89d80d..ae986b3c84dd 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -76,10 +76,10 @@ use std::sync::Arc; /// can be pushed through a `Aggregate(group_by = [a], agg=[SUM(b))`. However, a /// filter on `SUM(b)` can not be pushed through the same aggregate. /// -/// # Handling Conjuctions +/// # Handling Conjunctions /// /// It is possible to only push down **part** of a filter expression if is -/// connected with `AND`s (more formally if it is a "conjuction"). +/// connected with `AND`s (more formally if it is a "conjunction"). /// /// For example, given the following plan: ///