-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new physical rule CombinePartialFinalAggregate #5837
Changes from 5 commits
c0e7de2
be353c1
9b48539
8db03af
f32fbfc
ce3a125
4b8cd30
25957b1
c17e23f
0ba0cb6
4948319
9a505da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial and Final AggregateExecs | ||
//! and try to combine them if necessary | ||
use crate::error::Result; | ||
use crate::physical_optimizer::PhysicalOptimizerRule; | ||
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; | ||
use crate::physical_plan::ExecutionPlan; | ||
use datafusion_common::config::ConfigOptions; | ||
use std::sync::Arc; | ||
|
||
use datafusion_common::tree_node::{Transformed, TreeNode}; | ||
|
||
/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs | ||
/// into a Single AggregateExec if their grouping exprs and aggregate exprs equal. | ||
/// | ||
/// This rule should be applied after the EnforceDistribution and EnforceSorting rules | ||
/// | ||
#[derive(Default)] | ||
pub struct CombinePartialFinalAggregate {} | ||
|
||
impl CombinePartialFinalAggregate { | ||
#[allow(missing_docs)] | ||
pub fn new() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
impl PhysicalOptimizerRule for CombinePartialFinalAggregate { | ||
fn optimize( | ||
&self, | ||
plan: Arc<dyn ExecutionPlan>, | ||
_config: &ConfigOptions, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
plan.transform_down(&|plan| { | ||
let transformed = plan.as_any().downcast_ref::<AggregateExec>().and_then( | ||
|AggregateExec { | ||
mode: final_mode, | ||
input: final_input, | ||
group_by: final_group_by, | ||
aggr_expr: final_aggr_expr, | ||
.. | ||
}| { | ||
if matches!( | ||
final_mode, | ||
AggregateMode::Final | AggregateMode::FinalPartitioned | ||
) { | ||
final_input | ||
.as_any() | ||
.downcast_ref::<AggregateExec>() | ||
.and_then( | ||
|AggregateExec { | ||
mode: input_mode, | ||
input: partial_input, | ||
group_by: input_group_by, | ||
aggr_expr: input_aggr_expr, | ||
input_schema, | ||
.. | ||
}| { | ||
if matches!(input_mode, AggregateMode::Partial) | ||
&& final_group_by.eq(input_group_by) | ||
&& final_aggr_expr.len() == input_aggr_expr.len() | ||
&& final_aggr_expr | ||
.iter() | ||
.zip(input_aggr_expr.iter()) | ||
.all(|(final_expr, partial_expr)| { | ||
final_expr.eq(partial_expr) | ||
}) | ||
{ | ||
AggregateExec::try_new( | ||
AggregateMode::Single, | ||
input_group_by.clone(), | ||
input_aggr_expr.to_vec(), | ||
partial_input.clone(), | ||
input_schema.clone(), | ||
) | ||
.ok() | ||
.map(Arc::new) | ||
} else { | ||
None | ||
} | ||
}, | ||
) | ||
} else { | ||
None | ||
} | ||
}, | ||
); | ||
|
||
Ok(if let Some(transformed) = transformed { | ||
Transformed::Yes(transformed) | ||
} else { | ||
Transformed::No(plan) | ||
}) | ||
}) | ||
} | ||
|
||
fn name(&self) -> &str { | ||
"CombinePartialFinalAggregate" | ||
} | ||
|
||
fn schema_check(&self) -> bool { | ||
true | ||
} | ||
} | ||
mingmwang marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -65,6 +65,8 @@ pub enum AggregateMode { | |||||||||
/// with Hash repartitioning on the group keys. If a group key is | ||||||||||
/// duplicated, duplicate groups would be produced | ||||||||||
FinalPartitioned, | ||||||||||
/// Single aggregate is a combination of Partial and Final aggregate mode | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
Single, | ||||||||||
} | ||||||||||
|
||||||||||
/// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET) | ||||||||||
|
@@ -147,6 +149,24 @@ impl PhysicalGroupBy { | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
impl PartialEq for PhysicalGroupBy { | ||||||||||
fn eq(&self, other: &PhysicalGroupBy) -> bool { | ||||||||||
self.expr.len() == other.expr.len() | ||||||||||
&& self | ||||||||||
.expr | ||||||||||
.iter() | ||||||||||
.zip(other.expr.iter()) | ||||||||||
.all(|((expr1, name1), (expr2, name2))| expr1.eq(expr2) && name1 == name2) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wondered why this needed to be manually derived, so I tried removing it and got this error:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like if Struct contains any boxed Trait Object, we can not use the |
||||||||||
&& self.null_expr.len() == other.null_expr.len() | ||||||||||
&& self | ||||||||||
.null_expr | ||||||||||
.iter() | ||||||||||
.zip(other.null_expr.iter()) | ||||||||||
.all(|((expr1, name1), (expr2, name2))| expr1.eq(expr2) && name1 == name2) | ||||||||||
&& self.groups == other.groups | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
enum StreamType { | ||||||||||
AggregateStream(AggregateStream), | ||||||||||
GroupedHashAggregateStream(GroupedHashAggregateStream), | ||||||||||
|
@@ -316,8 +336,8 @@ impl ExecutionPlan for AggregateExec { | |||||||||
/// Get the output partitioning of this plan | ||||||||||
fn output_partitioning(&self) -> Partitioning { | ||||||||||
match &self.mode { | ||||||||||
AggregateMode::Partial => { | ||||||||||
// Partial Aggregation will not change the output partitioning but need to respect the Alias | ||||||||||
AggregateMode::Partial | AggregateMode::Single => { | ||||||||||
// Partial and Single Aggregation will not change the output partitioning but need to respect the Alias | ||||||||||
let input_partition = self.input.output_partitioning(); | ||||||||||
match input_partition { | ||||||||||
Partitioning::Hash(exprs, part) => { | ||||||||||
|
@@ -360,7 +380,9 @@ impl ExecutionPlan for AggregateExec { | |||||||||
|
||||||||||
fn required_input_distribution(&self) -> Vec<Distribution> { | ||||||||||
match &self.mode { | ||||||||||
AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], | ||||||||||
AggregateMode::Partial | AggregateMode::Single => { | ||||||||||
Dandandan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
vec![Distribution::UnspecifiedDistribution] | ||||||||||
} | ||||||||||
AggregateMode::FinalPartitioned => { | ||||||||||
vec![Distribution::HashPartitioned(self.output_group_expr())] | ||||||||||
} | ||||||||||
|
@@ -528,7 +550,9 @@ fn create_schema( | |||||||||
fields.extend(expr.state_fields()?.iter().cloned()) | ||||||||||
} | ||||||||||
} | ||||||||||
AggregateMode::Final | AggregateMode::FinalPartitioned => { | ||||||||||
AggregateMode::Final | ||||||||||
| AggregateMode::FinalPartitioned | ||||||||||
| AggregateMode::Single => { | ||||||||||
// in final mode, the field with the final result of the accumulator | ||||||||||
for expr in aggr_expr { | ||||||||||
fields.push(expr.field()?) | ||||||||||
|
@@ -554,7 +578,7 @@ fn aggregate_expressions( | |||||||||
col_idx_base: usize, | ||||||||||
) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> { | ||||||||||
match mode { | ||||||||||
AggregateMode::Partial => { | ||||||||||
AggregateMode::Partial | AggregateMode::Single => { | ||||||||||
Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect()) | ||||||||||
} | ||||||||||
// in this mode, we build the merge expressions of the aggregation | ||||||||||
|
@@ -617,7 +641,7 @@ fn create_row_accumulators( | |||||||||
} | ||||||||||
|
||||||||||
/// returns a vector of ArrayRefs, where each entry corresponds to either the | ||||||||||
/// final value (mode = Final) or states (mode = Partial) | ||||||||||
/// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) | ||||||||||
fn finalize_aggregation( | ||||||||||
accumulators: &[AccumulatorItem], | ||||||||||
mode: &AggregateMode, | ||||||||||
|
@@ -636,7 +660,9 @@ fn finalize_aggregation( | |||||||||
.collect::<Result<Vec<_>>>()?; | ||||||||||
Ok(a.iter().flatten().cloned().collect::<Vec<_>>()) | ||||||||||
} | ||||||||||
AggregateMode::Final | AggregateMode::FinalPartitioned => { | ||||||||||
AggregateMode::Final | ||||||||||
| AggregateMode::FinalPartitioned | ||||||||||
| AggregateMode::Single => { | ||||||||||
// merge the state to the final value | ||||||||||
accumulators | ||||||||||
.iter() | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's no
RepartitionExec
, it means the distribution ofAggregateExec
with final mode andAggregateExec
with partial mode are the same. Therefore, there's no need to do two-phase aggregations.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mingmwang for introducing this rule, which will significantly improve the query performances for the SQL patterns shown in UTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the performance improve will not that significant, because usually the
Final
aggregation step is not that heavy.