-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ResolveGroupingAnalytics analyzer rule #5749
Closed
Closed
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
8e87787
add analyzer rule replace_grouping_func
mingmwang 1acc425
merge with latest
mingmwang 6a9fbf3
add VirtualColumn
mingmwang 1cee4f4
fix common_subexpr_eliminate
mingmwang 3c4d8b0
merge with upstream
mingmwang b3c41a1
Add ResolveGroupingAnalytics analyzer rule
mingmwang 2b183f4
merge with upstream
mingmwang 4e065c9
fix failed UT, make the sort result stable
mingmwang 7f68b32
merge with upstream, resolve conflicts
mingmwang f6059e8
minor change
mingmwang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,7 +86,11 @@ pub enum AggregateMode { | |
#[derive(Clone, Debug, Default)] | ||
pub struct PhysicalGroupBy { | ||
/// Distinct (Physical Expr, Alias) in the grouping set | ||
expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
/// Hidden grouping set expr in the grouping set | ||
hidden_grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
/// Distinct result expr for the grouping set, used to generate output schema | ||
result_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
/// Corresponding NULL expressions for expr | ||
null_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
/// Null mask for each group in this grouping set. Each group is | ||
|
@@ -99,12 +103,16 @@ pub struct PhysicalGroupBy { | |
impl PhysicalGroupBy { | ||
/// Create a new `PhysicalGroupBy` | ||
pub fn new( | ||
expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
hidden_grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
result_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
null_expr: Vec<(Arc<dyn PhysicalExpr>, String)>, | ||
groups: Vec<Vec<bool>>, | ||
) -> Self { | ||
Self { | ||
expr, | ||
grouping_set_expr, | ||
hidden_grouping_set_expr, | ||
result_expr, | ||
null_expr, | ||
groups, | ||
} | ||
|
@@ -115,7 +123,9 @@ impl PhysicalGroupBy { | |
pub fn new_single(expr: Vec<(Arc<dyn PhysicalExpr>, String)>) -> Self { | ||
let num_exprs = expr.len(); | ||
Self { | ||
expr, | ||
grouping_set_expr: expr.clone(), | ||
hidden_grouping_set_expr: vec![], | ||
result_expr: expr, | ||
null_expr: vec![], | ||
groups: vec![vec![false; num_exprs]], | ||
} | ||
|
@@ -128,22 +138,32 @@ impl PhysicalGroupBy { | |
|
||
/// Returns the group expressions | ||
pub fn expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] { | ||
&self.expr | ||
&self.grouping_set_expr | ||
} | ||
|
||
/// Returns the group result expressions | ||
pub fn result_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] { | ||
&self.result_expr | ||
} | ||
|
||
/// Returns the null expressions | ||
pub fn null_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] { | ||
&self.null_expr | ||
} | ||
|
||
/// Returns the hidden grouping set expressions | ||
pub fn hidden_grouping_set_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] { | ||
&self.hidden_grouping_set_expr | ||
} | ||
|
||
/// Returns the group null masks | ||
pub fn groups(&self) -> &[Vec<bool>] { | ||
&self.groups | ||
} | ||
|
||
/// Returns true if this `PhysicalGroupBy` has no group expressions | ||
pub fn is_empty(&self) -> bool { | ||
self.expr.is_empty() | ||
self.grouping_set_expr.is_empty() | ||
} | ||
} | ||
|
||
|
@@ -196,7 +216,7 @@ impl AggregateExec { | |
) -> Result<Self> { | ||
let schema = create_schema( | ||
&input.schema(), | ||
&group_by.expr, | ||
group_by.result_expr(), | ||
&aggr_expr, | ||
group_by.contains_null(), | ||
mode, | ||
|
@@ -205,7 +225,7 @@ impl AggregateExec { | |
let schema = Arc::new(schema); | ||
|
||
let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new(); | ||
for (expression, name) in group_by.expr.iter() { | ||
for (expression, name) in group_by.result_expr().iter() { | ||
if let Some(column) = expression.as_any().downcast_ref::<Column>() { | ||
let new_col_idx = schema.index_of(name)?; | ||
// When the column name is the same, but index does not equal, treat it as Alias | ||
|
@@ -243,7 +263,7 @@ impl AggregateExec { | |
// Update column indices. Since the group by columns come first in the output schema, their | ||
// indices are simply 0..self.group_expr(len). | ||
self.group_by | ||
.expr() | ||
.result_expr() | ||
.iter() | ||
.enumerate() | ||
.map(|(index, (_col, name))| { | ||
|
@@ -275,7 +295,7 @@ impl AggregateExec { | |
let batch_size = context.session_config().batch_size(); | ||
let input = self.input.execute(partition, Arc::clone(&context))?; | ||
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); | ||
if self.group_by.expr.is_empty() { | ||
if self.group_by.result_expr().is_empty() { | ||
Ok(StreamType::AggregateStream(AggregateStream::new( | ||
self.mode, | ||
self.schema.clone(), | ||
|
@@ -418,7 +438,7 @@ impl ExecutionPlan for AggregateExec { | |
write!(f, "AggregateExec: mode={:?}", self.mode)?; | ||
let g: Vec<String> = if self.group_by.groups.len() == 1 { | ||
self.group_by | ||
.expr | ||
.grouping_set_expr | ||
.iter() | ||
.map(|(e, alias)| { | ||
let e = e.to_string(); | ||
|
@@ -447,7 +467,8 @@ impl ExecutionPlan for AggregateExec { | |
e | ||
} | ||
} else { | ||
let (e, alias) = &self.group_by.expr[idx]; | ||
let (e, alias) = | ||
&self.group_by.grouping_set_expr[idx]; | ||
let e = e.to_string(); | ||
if &e != alias { | ||
format!("{e} as {alias}") | ||
|
@@ -484,7 +505,7 @@ impl ExecutionPlan for AggregateExec { | |
// - aggregations somtimes also preserve invariants such as min, max... | ||
match self.mode { | ||
AggregateMode::Final | AggregateMode::FinalPartitioned | ||
if self.group_by.expr.is_empty() => | ||
if self.group_by.result_expr().is_empty() => | ||
{ | ||
Statistics { | ||
num_rows: Some(1), | ||
|
@@ -671,16 +692,16 @@ fn evaluate_group_by( | |
group_by: &PhysicalGroupBy, | ||
batch: &RecordBatch, | ||
) -> Result<Vec<Vec<ArrayRef>>> { | ||
let exprs: Vec<ArrayRef> = group_by | ||
.expr | ||
let exprs_value: Vec<ArrayRef> = group_by | ||
.grouping_set_expr | ||
.iter() | ||
.map(|(expr, _)| { | ||
let value = expr.evaluate(batch)?; | ||
Ok(value.into_array(batch.num_rows())) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
let null_exprs: Vec<ArrayRef> = group_by | ||
let null_exprs_value: Vec<ArrayRef> = group_by | ||
.null_expr | ||
.iter() | ||
.map(|(expr, _)| { | ||
|
@@ -689,23 +710,61 @@ fn evaluate_group_by( | |
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
Ok(group_by | ||
.groups | ||
.iter() | ||
.map(|group| { | ||
group | ||
.iter() | ||
.enumerate() | ||
.map(|(idx, is_null)| { | ||
if *is_null { | ||
null_exprs[idx].clone() | ||
} else { | ||
exprs[idx].clone() | ||
} | ||
}) | ||
.collect() | ||
}) | ||
.collect()) | ||
if !group_by.hidden_grouping_set_expr().is_empty() { | ||
let hidden_exprs_value: Vec<ArrayRef> = group_by | ||
.hidden_grouping_set_expr | ||
.iter() | ||
.map(|(expr, _)| { | ||
let value = expr.evaluate(batch)?; | ||
Ok(value.into_array(batch.num_rows())) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
let chunk_size = hidden_exprs_value.len() / group_by.groups.len(); | ||
let hidden_expr_value_chunks = | ||
hidden_exprs_value.chunks(chunk_size).collect::<Vec<_>>(); | ||
|
||
Ok(group_by | ||
.groups | ||
.iter() | ||
.enumerate() | ||
.map(|(groud_id, group)| { | ||
let mut group_data = group | ||
.iter() | ||
.enumerate() | ||
.map(|(idx, is_null)| { | ||
if *is_null { | ||
null_exprs_value[idx].clone() | ||
} else { | ||
exprs_value[idx].clone() | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
Comment on lines
+732
to
+742
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can extract a function named |
||
for data in hidden_expr_value_chunks[groud_id] { | ||
group_data.push(data.clone()); | ||
} | ||
group_data | ||
}) | ||
.collect()) | ||
} else { | ||
Ok(group_by | ||
.groups | ||
.iter() | ||
.map(|group| { | ||
group | ||
.iter() | ||
.enumerate() | ||
.map(|(idx, is_null)| { | ||
if *is_null { | ||
null_exprs_value[idx].clone() | ||
} else { | ||
exprs_value[idx].clone() | ||
} | ||
}) | ||
.collect::<Vec<_>>() | ||
Comment on lines
+754
to
+764
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use function |
||
}) | ||
.collect()) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
@@ -775,7 +834,12 @@ mod tests { | |
let input_schema = input.schema(); | ||
|
||
let grouping_set = PhysicalGroupBy { | ||
expr: vec![ | ||
grouping_set_expr: vec![ | ||
(col("a", &input_schema)?, "a".to_string()), | ||
(col("b", &input_schema)?, "b".to_string()), | ||
], | ||
hidden_grouping_set_expr: vec![], | ||
result_expr: vec![ | ||
(col("a", &input_schema)?, "a".to_string()), | ||
(col("b", &input_schema)?, "b".to_string()), | ||
], | ||
|
@@ -890,9 +954,11 @@ mod tests { | |
let input_schema = input.schema(); | ||
|
||
let grouping_set = PhysicalGroupBy { | ||
expr: vec![(col("a", &input_schema)?, "a".to_string())], | ||
grouping_set_expr: vec![(col("a", &input_schema)?, "a".to_string())], | ||
hidden_grouping_set_expr: vec![], | ||
null_expr: vec![], | ||
groups: vec![vec![false]], | ||
result_expr: vec![(col("a", &input_schema)?, "a".to_string())], | ||
}; | ||
|
||
let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new( | ||
|
@@ -929,7 +995,7 @@ mod tests { | |
let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); | ||
|
||
let final_group: Vec<(Arc<dyn PhysicalExpr>, String)> = grouping_set | ||
.expr | ||
.result_expr() | ||
.iter() | ||
.map(|(_expr, name)| Ok((col(name, &input_schema)?, name.clone()))) | ||
.collect::<Result<_>>()?; | ||
|
@@ -1119,9 +1185,11 @@ mod tests { | |
|
||
let groups_none = PhysicalGroupBy::default(); | ||
let groups_some = PhysicalGroupBy { | ||
expr: vec![(col("a", &input_schema)?, "a".to_string())], | ||
grouping_set_expr: vec![(col("a", &input_schema)?, "a".to_string())], | ||
hidden_grouping_set_expr: vec![], | ||
null_expr: vec![], | ||
groups: vec![vec![false]], | ||
result_expr: vec![(col("a", &input_schema)?, "a".to_string())], | ||
}; | ||
|
||
// something that allocates within the aggregator | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question.
Is
result_expr
a part ofgrouping_set_expr
?If yes, maybe we can use a index vec to point
grouping_set_expr