From 40869a4d394af27b0a99c4061398d86e8005adec Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 26 Mar 2024 11:22:07 +0300 Subject: [PATCH 01/12] Replace with cached exprs --- .../src/physical_optimizer/enforce_sorting.rs | 77 ++++++ datafusion/core/src/physical_planner.rs | 5 + datafusion/core/tests/data/demo.csv | 2 + .../optimizer/src/common_subexpr_eliminate.rs | 232 ++++++++++++------ .../optimizer/src/optimize_projections.rs | 23 +- .../test_files/project_complex_sub_query.slt | 17 +- 6 files changed, 253 insertions(+), 103 deletions(-) create mode 100644 datafusion/core/tests/data/demo.csv diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 5bf21c3dfab5..95e4a06b516b 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2347,3 +2347,80 @@ mod tests { Ok(()) } } + + +#[cfg(test)] +mod tmp_tests { + use crate::assert_batches_eq; + use crate::physical_plan::{collect, displayable, ExecutionPlan}; + use crate::prelude::SessionContext; + use arrow::util::pretty::print_batches; + use datafusion_common::Result; + use datafusion_execution::config::SessionConfig; + use datafusion_physical_plan::get_plan_string; + use std::sync::Arc; + + fn print_plan(plan: &Arc) -> Result<()> { + let formatted = displayable(plan.as_ref()).indent(true).to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + println!("{:#?}", actual); + Ok(()) + } + + const MULTIPLE_ORDERED_TABLE: &str = "CREATE EXTERNAL TABLE multiple_ordered_table ( + a0 INTEGER, + a INTEGER, + b INTEGER, + c INTEGER, + d INTEGER + ) + STORED AS CSV + WITH HEADER ROW + WITH ORDER (a ASC, b ASC) + WITH ORDER (c ASC) + LOCATION '../core/tests/data/window_2.csv'"; + + #[tokio::test] + async fn test_query() -> Result<()> { + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + ctx.sql(MULTIPLE_ORDERED_TABLE).await?; + + let sql = "SELECT a+b, SUM(a+b) OVER() FROM multiple_ordered_table"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + print_plan(&physical_plan)?; + let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?; + print_batches(&batches)?; + + let expected = vec![ + "ProjectionExec: expr=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as nth_value1]", + " GlobalLimitExec: skip=0, fetch=5", + " BoundedWindowAggExec: wdw=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]", + " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let expected = [ + "+------------+", + "| nth_value1 |", + "+------------+", + "| 2 |", + "| 2 |", + "| 2 |", + "| 2 |", + "| 2 |", + "+------------+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) + } +} diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ca708b05823e..4334bb617f78 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -855,6 +855,11 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Projection(Projection { input, expr, .. }) => { + println!("---------------"); + for expr in expr{ + println!("expr:{:?}", expr); + } + println!("---------------"); let input_exec = self.create_initial_plan(input, session_state).await?; let input_schema = input.as_ref().schema(); diff --git a/datafusion/core/tests/data/demo.csv b/datafusion/core/tests/data/demo.csv new file mode 100644 index 000000000000..d1492228246f --- /dev/null +++ b/datafusion/core/tests/data/demo.csv @@ -0,0 +1,2 @@ +c1,c2,c3,c4,c5 +1,2,3,4,5 diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 57a3519c6b55..ef6b15fca313 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -123,15 +123,14 @@ impl CommonSubexprEliminate { .iter() .zip(arrays_list.iter()) .map(|(exprs, arrays)| { - let res = exprs + exprs .iter() .cloned() .zip(arrays.iter()) .map(|(expr, id_array)| { replace_common_expr(expr, id_array, expr_set, affected_id) }) - .collect::>>(); - res + .collect::>>() }) .collect::>>() } @@ -424,8 +423,10 @@ impl CommonSubexprEliminate { plan.clone() .rewrite(&mut ProjectionAdder { data_type_map: HashMap::new(), - depth_map: HashMap::new(), + cumulative_expr_map: HashMap::new(), + cumulative_map: HashSet::new(), depth: 0, + should_update_parents: false, }) .map(|transformed| Some(transformed.data)) } @@ -438,7 +439,6 @@ impl OptimizerRule for CommonSubexprEliminate { ) -> Result> { let optimized_plan_option = self.common_optimize(plan, config)?; - // println!("optimized plan option is {:?}", optimized_plan_option); let plan = match optimized_plan_option { Some(plan) => plan, _ => plan.clone(), @@ -476,8 +476,7 @@ fn to_arrays( expr_set: &mut ExprSet, expr_mask: ExprMask, ) -> Result>> { - let res = expr - .iter() + expr.iter() .map(|e| { let mut id_array = vec![]; expr_to_identifier( @@ -490,8 +489,7 @@ fn to_arrays( Ok(id_array) }) - .collect::>>(); - res + .collect::>>() } /// Build the "intermediate" projection plan that evaluates the extracted common expressions. @@ -852,9 +850,11 @@ fn replace_common_expr( } struct ProjectionAdder { - depth_map: HashMap>, + cumulative_expr_map: HashMap>, + cumulative_map: HashSet, depth: u128, data_type_map: HashMap, + should_update_parents: bool, } pub fn is_not_complex(op: &Operator) -> bool { matches!( @@ -882,11 +882,6 @@ impl ProjectionAdder { .field_from_column(l) .expect("Field not found for left column"); - // res.insert(DFField::new_unqualified( - // &expr.to_string(), - // l_field.data_type().clone(), - // true, - // )); expr_data_type .entry(expr.clone()) .or_insert(l_field.data_type().clone()); @@ -917,81 +912,166 @@ impl TreeNodeRewriter for ProjectionAdder { /// currently we just collect the complex bianryOP fn f_down(&mut self, node: Self::Node) -> Result> { - // use depth to trace where we are in the LogicalPlan tree - self.depth += 1; - // extract all expressions + check whether it contains in depth_sets - let exprs = node.expressions(); - let depth_set = self.depth_map.entry(self.depth).or_default(); - let mut schema = node.schema().deref().clone(); - for ip in node.inputs() { - schema.merge(ip.schema()); + let depth_set = &mut self.cumulative_map; + // Insert for other end points + if let LogicalPlan::TableScan(_) = node{ + self.cumulative_expr_map.insert(self.depth, depth_set.clone()); + self.depth += 1; + return Ok(Transformed::no(node)); + } else { + // TODO: Assumes operators doesn't modify name of the fields. + // use depth to trace where we are in the LogicalPlan tree + self.depth += 1; + // extract all expressions + check whether it contains in depth_sets + let exprs = node.expressions(); + // let depth_set = self.cumulative_expr_map.entry(self.depth).or_default(); + let mut schema = node.schema().deref().clone(); + for ip in node.inputs() { + schema.merge(ip.schema()); + } + let (extended_set, data_map) = + Self::get_complex_expressions(exprs, Arc::new(schema)); + depth_set.extend(extended_set); + self.data_type_map.extend(data_map); + Ok(Transformed::no(node)) } - let (extended_set, data_map) = - Self::get_complex_expressions(exprs, Arc::new(schema)); - depth_set.extend(extended_set); - self.data_type_map.extend(data_map); - Ok(Transformed::no(node)) } - fn f_up(&mut self, node: Self::Node) -> Result> { - let current_depth_schema = - self.depth_map.get(&self.depth).cloned().unwrap_or_default(); - // get the intersected part - let added_expr = self - .depth_map - .iter() - .filter(|(&depth, _)| depth < self.depth) - .fold(current_depth_schema, |acc, (_, expr)| { - acc.intersection(expr).cloned().collect() - }); + fn f_up(&mut self, node: Self::Node) -> Result> { + let added_expr = + self.cumulative_expr_map.get(&self.depth).cloned().unwrap_or_default(); + println!("self.cumulative_expr_map: {:?}", self.cumulative_expr_map); + println!("self.cumulative_map: {:?}", self.cumulative_map); + println!("node:{:?}", node); self.depth -= 1; // do not do extra things - if added_expr.is_empty() { + let should_add_projection = !added_expr.is_empty(); + if !should_add_projection && !self.should_update_parents { + println!("not updating node"); return Ok(Transformed::no(node)); } - match node { - // do not add for Projections - LogicalPlan::Projection(_) - | LogicalPlan::TableScan(_) - | LogicalPlan::Join(_) => Ok(Transformed::no(node)), - _ => { - // avoid recursive add projections - if added_expr.iter().any(|expr| { - node.inputs()[0] - .schema() - .has_column_with_unqualified_name(&expr.to_string()) - }) { - return Ok(Transformed::no(node)); + // if added_expr.is_empty() { + // println!("not updating node"); + // return Ok(Transformed::no(node)); + // } + + let child = node.inputs()[0]; + let child = if should_add_projection { + self.should_update_parents = true; + let mut field_set = HashSet::new(); + let mut project_exprs = vec![]; + for expr in &added_expr { + let f = DFField::new_unqualified( + &expr.to_string(), + self.data_type_map[expr].clone(), + true, + ); + field_set.insert(f.name().to_owned()); + project_exprs.push(expr.clone().alias(expr.to_string())); + } + // Do not lose fields in the child. + for field in child.schema().fields() { + if field_set.insert(field.qualified_name()) { + project_exprs.push(Expr::Column(field.qualified_column())); } + } - let mut field_set = HashSet::new(); - let mut project_exprs = vec![]; - for expr in added_expr { - let f = DFField::new_unqualified( - &expr.to_string(), - self.data_type_map[&expr].clone(), - true, - ); - field_set.insert(f.name().to_owned()); - project_exprs.push(expr.clone().alias(expr.to_string())); - } - for field in node.inputs()[0].schema().fields() { - if field_set.insert(field.qualified_name()) { - project_exprs.push(Expr::Column(field.qualified_column())); - } + // adding new plan here + let new_child = LogicalPlan::Projection(Projection::try_new( + project_exprs, + Arc::new(node.inputs()[0].clone()), + )?); + new_child + } else { + child.clone() + }; + let mut expressions = node.expressions(); + // for expr in &added_expr{ + // println!("expr.display_name() {:?}", expr.display_name()?); + // } + println!(" original expressions: {:?}", expressions); + let available_columns = child.schema().fields().iter().map(|field| field.qualified_column()).collect::>(); + // Replace expressions with its pre-computed variant if available. + expressions.iter_mut().try_for_each(|expr|{ + update_expr_with_available_columns(expr, &available_columns) + })?; + println!("after update expressions: {:?}", expressions); + + let new_node = node.with_new_exprs(expressions, [child].to_vec())?; + println!("new node: {:?}", new_node); + Ok(Transformed::yes( + new_node, + )) + + // match node { + // // do not add for Projections + // LogicalPlan::Projection(_) + // | LogicalPlan::TableScan(_) + // | LogicalPlan::Join(_) => Ok(Transformed::no(node)), + // _ => { + // // avoid recursive add projections + // if added_expr.iter().any(|expr| { + // node.inputs()[0] + // .schema() + // .has_column_with_unqualified_name(&expr.to_string()) + // }) { + // return Ok(Transformed::no(node)); + // } + // + // let mut field_set = HashSet::new(); + // let mut project_exprs = vec![]; + // for expr in added_expr { + // let f = DFField::new_unqualified( + // &expr.to_string(), + // self.data_type_map[&expr].clone(), + // true, + // ); + // field_set.insert(f.name().to_owned()); + // project_exprs.push(expr.clone().alias(expr.to_string())); + // } + // for field in node.inputs()[0].schema().fields() { + // if field_set.insert(field.qualified_name()) { + // project_exprs.push(Expr::Column(field.qualified_column())); + // } + // } + // // adding new plan here + // let new_plan = LogicalPlan::Projection(Projection::try_new( + // project_exprs, + // Arc::new(node.inputs()[0].clone()), + // )?); + // Ok(Transformed::yes( + // node.with_new_exprs(node.expressions(), [new_plan].to_vec())?, + // )) + // } + // } + } +} + +fn update_expr_with_available_columns(expr: &mut Expr, available_columns: &[Column]) -> Result<()> { + match expr { + Expr::BinaryExpr(_) => { + for available_col in available_columns{ + // println!("cached_expr.display_name(), expr.display_name() {:?}, {:?}", cached_expr.display_name()?, expr.display_name()?); + if available_col.flat_name() == expr.display_name()?{ + println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); + *expr = Expr::Column(available_col.clone()); } - // adding new plan here - let new_plan = LogicalPlan::Projection(Projection::try_new( - project_exprs, - Arc::new(node.inputs()[0].clone()), - )?); - Ok(Transformed::yes( - node.with_new_exprs(node.expressions(), [new_plan].to_vec())?, - )) } + }, + Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { + args.iter_mut().try_for_each(|arg| update_expr_with_available_columns(arg, available_columns))? + }, + Expr::Cast(Cast{ expr, .. }) => { + update_expr_with_available_columns(expr, available_columns)? + } + _ => { + println!("unsupported expression : {:?}", expr); + // cannot rewrite } } + Ok(()) } + #[cfg(test)] mod test { use std::iter; diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 8ad773d89e13..284ea6673f8b 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -26,7 +26,6 @@ use std::collections::HashSet; use std::sync::Arc; -use crate::common_subexpr_eliminate::is_not_complex; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; @@ -70,6 +69,8 @@ impl OptimizerRule for OptimizeProjections { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { + println!("plan at the start:"); + println!("{:?}", plan); // All output fields are necessary: let indices = (0..plan.schema().fields().len()).collect::>(); optimize_projections(plan, config, &indices) @@ -694,26 +695,10 @@ fn indices_referred_by_expr( // TODO: Support more Expressions let mut cols = expr.to_columns()?; outer_columns(expr, &mut cols); - let mut res_vec: Vec = cols + Ok(cols .iter() .flat_map(|col| input_schema.index_of_column(col)) - .collect(); - match expr { - Expr::BinaryExpr(BinaryExpr { op, .. }) if !is_not_complex(op) => { - if let Some(index) = - input_schema.index_of_column_by_name(None, &expr.to_string())? - { - match res_vec.binary_search(&index) { - Ok(_) => {} - Err(pos) => { - res_vec.insert(pos, index); - } - } - } - } - _ => {} - } - Ok(res_vec) + .collect()) } /// Gets all required indices for the input; i.e. those required by the parent diff --git a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt index ffcb9cc3d70c..30a84f1fc397 100644 --- a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt +++ b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt @@ -31,15 +31,17 @@ STORED AS CSV WITH HEADER ROW LOCATION '../core/tests/data/demo.csv'; +statement ok +set datafusion.explain.logical_plan_only = true; + query TT explain SELECT c3+c4, SUM(c3+c4) OVER() FROM t; ---- logical_plan -Projection: t.c3 + t.c4, SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ---WindowAggr: windowExpr=[[SUM(CAST(t.c3 + t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] -----Projection: t.c3 + t.c4 AS t.c3 + t.c4, t.c3, t.c4 -------TableScan: t projection=[c3, c4] +WindowAggr: windowExpr=[[SUM(CAST(t.c3 + t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +--Projection: t.c3 + t.c4 AS t.c3 + t.c4 +----TableScan: t projection=[c3, c4] query TT explain SELECT c3+c4, SUM(c3+c4) OVER(order by c3+c4) @@ -66,7 +68,6 @@ explain SELECT c3-c4, SUM(c3-c4) OVER() FROM t; ---- logical_plan -Projection: t.c3 - t.c4, SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ---WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] -----Projection: t.c3 - t.c4 AS t.c3 - t.c4, t.c3, t.c4 -------TableScan: t projection=[c3, c4] +WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +--Projection: t.c3 - t.c4 AS t.c3 - t.c4 +----TableScan: t projection=[c3, c4] From 4396682fa15ca4ef111f1d300562af9504d5cdba Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Mar 2024 15:21:41 +0300 Subject: [PATCH 02/12] Minor changes --- datafusion/core/src/physical_planner.rs | 5 ----- .../optimizer/src/common_subexpr_eliminate.rs | 18 +++++++++--------- .../optimizer/src/optimize_projections.rs | 2 -- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 4334bb617f78..ca708b05823e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -855,11 +855,6 @@ impl DefaultPhysicalPlanner { )?)) } LogicalPlan::Projection(Projection { input, expr, .. }) => { - println!("---------------"); - for expr in expr{ - println!("expr:{:?}", expr); - } - println!("---------------"); let input_exec = self.create_initial_plan(input, session_state).await?; let input_schema = input.as_ref().schema(); diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index ef6b15fca313..3aef3cf703c1 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -940,14 +940,14 @@ impl TreeNodeRewriter for ProjectionAdder { fn f_up(&mut self, node: Self::Node) -> Result> { let added_expr = self.cumulative_expr_map.get(&self.depth).cloned().unwrap_or_default(); - println!("self.cumulative_expr_map: {:?}", self.cumulative_expr_map); - println!("self.cumulative_map: {:?}", self.cumulative_map); - println!("node:{:?}", node); + // println!("self.cumulative_expr_map: {:?}", self.cumulative_expr_map); + // println!("self.cumulative_map: {:?}", self.cumulative_map); + // println!("node:{:?}", node); self.depth -= 1; // do not do extra things let should_add_projection = !added_expr.is_empty(); if !should_add_projection && !self.should_update_parents { - println!("not updating node"); + // println!("not updating node"); return Ok(Transformed::no(node)); } // if added_expr.is_empty() { @@ -989,16 +989,16 @@ impl TreeNodeRewriter for ProjectionAdder { // for expr in &added_expr{ // println!("expr.display_name() {:?}", expr.display_name()?); // } - println!(" original expressions: {:?}", expressions); + // println!(" original expressions: {:?}", expressions); let available_columns = child.schema().fields().iter().map(|field| field.qualified_column()).collect::>(); // Replace expressions with its pre-computed variant if available. expressions.iter_mut().try_for_each(|expr|{ update_expr_with_available_columns(expr, &available_columns) })?; - println!("after update expressions: {:?}", expressions); + // println!("after update expressions: {:?}", expressions); let new_node = node.with_new_exprs(expressions, [child].to_vec())?; - println!("new node: {:?}", new_node); + // println!("new node: {:?}", new_node); Ok(Transformed::yes( new_node, )) @@ -1053,7 +1053,7 @@ fn update_expr_with_available_columns(expr: &mut Expr, available_columns: &[Colu for available_col in available_columns{ // println!("cached_expr.display_name(), expr.display_name() {:?}, {:?}", cached_expr.display_name()?, expr.display_name()?); if available_col.flat_name() == expr.display_name()?{ - println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); + // println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); *expr = Expr::Column(available_col.clone()); } } @@ -1065,7 +1065,7 @@ fn update_expr_with_available_columns(expr: &mut Expr, available_columns: &[Colu update_expr_with_available_columns(expr, available_columns)? } _ => { - println!("unsupported expression : {:?}", expr); + // println!("unsupported expression : {:?}", expr); // cannot rewrite } } diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 284ea6673f8b..39dc56d2af93 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -69,8 +69,6 @@ impl OptimizerRule for OptimizeProjections { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - println!("plan at the start:"); - println!("{:?}", plan); // All output fields are necessary: let indices = (0..plan.schema().fields().len()).collect::>(); optimize_projections(plan, config, &indices) From 83002cefa8f8f19407dcf49f236e74b58d2eb010 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 27 Mar 2024 16:24:32 +0300 Subject: [PATCH 03/12] Minor changes --- .../optimizer/src/common_subexpr_eliminate.rs | 165 +++++++----------- 1 file changed, 64 insertions(+), 101 deletions(-) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 3aef3cf703c1..bab58b5880e7 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -906,6 +906,47 @@ impl ProjectionAdder { } (res, expr_data_type) } + + fn update_expr_with_available_columns(&self, expr: &mut Expr, available_columns: &[Column]) -> Result<()> { + match expr { + Expr::BinaryExpr(_) => { + for available_col in available_columns{ + // println!("cached_expr.display_name(), expr.display_name() {:?}, {:?}", cached_expr.display_name()?, expr.display_name()?); + if available_col.flat_name() == expr.display_name()?{ + // println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); + *expr = Expr::Column(available_col.clone()); + } + } + }, + Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { + args.iter_mut().try_for_each(|arg| self.update_expr_with_available_columns(arg, available_columns))? + }, + Expr::Cast(Cast{ expr, .. }) => { + self.update_expr_with_available_columns(expr, available_columns)? + } + _ => { + // cannot rewrite + } + } + Ok(()) + } + + + // Assumes operators doesn't modify name of the fields. + // Otherwise this operation is not safe. + fn extend_with_exprs(&mut self, node: &LogicalPlan) { + // use depth to trace where we are in the LogicalPlan tree + // extract all expressions + check whether it contains in depth_sets + let exprs = node.expressions(); + let mut schema = node.schema().deref().clone(); + for ip in node.inputs() { + schema.merge(ip.schema()); + } + let (extended_set, data_map) = + Self::get_complex_expressions(exprs, Arc::new(schema)); + self.cumulative_map.extend(extended_set); + self.data_type_map.extend(data_map); + } } impl TreeNodeRewriter for ProjectionAdder { type Node = LogicalPlan; @@ -914,35 +955,35 @@ impl TreeNodeRewriter for ProjectionAdder { fn f_down(&mut self, node: Self::Node) -> Result> { let depth_set = &mut self.cumulative_map; // Insert for other end points - if let LogicalPlan::TableScan(_) = node{ - self.cumulative_expr_map.insert(self.depth, depth_set.clone()); - self.depth += 1; - return Ok(Transformed::no(node)); - } else { - // TODO: Assumes operators doesn't modify name of the fields. - // use depth to trace where we are in the LogicalPlan tree - self.depth += 1; - // extract all expressions + check whether it contains in depth_sets - let exprs = node.expressions(); - // let depth_set = self.cumulative_expr_map.entry(self.depth).or_default(); - let mut schema = node.schema().deref().clone(); - for ip in node.inputs() { - schema.merge(ip.schema()); + self.depth += 1; + match node { + LogicalPlan::TableScan(_) => { + self.cumulative_expr_map.insert(self.depth-1, depth_set.clone()); + return Ok(Transformed::no(node)); + }, + LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) => { + self.extend_with_exprs(&node); + Ok(Transformed::no(node)) + } + LogicalPlan::Projection(_) => { + if depth_set.is_empty(){ + self.extend_with_exprs(&node); + } else { + self.cumulative_expr_map.insert(self.depth-1, depth_set.clone()); + } + Ok(Transformed::no(node)) + }, + _ => { + // Unsupported operators + self.cumulative_map.clear(); + Ok(Transformed::no(node)) } - let (extended_set, data_map) = - Self::get_complex_expressions(exprs, Arc::new(schema)); - depth_set.extend(extended_set); - self.data_type_map.extend(data_map); - Ok(Transformed::no(node)) } } fn f_up(&mut self, node: Self::Node) -> Result> { let added_expr = self.cumulative_expr_map.get(&self.depth).cloned().unwrap_or_default(); - // println!("self.cumulative_expr_map: {:?}", self.cumulative_expr_map); - // println!("self.cumulative_map: {:?}", self.cumulative_map); - // println!("node:{:?}", node); self.depth -= 1; // do not do extra things let should_add_projection = !added_expr.is_empty(); @@ -950,10 +991,6 @@ impl TreeNodeRewriter for ProjectionAdder { // println!("not updating node"); return Ok(Transformed::no(node)); } - // if added_expr.is_empty() { - // println!("not updating node"); - // return Ok(Transformed::no(node)); - // } let child = node.inputs()[0]; let child = if should_add_projection { @@ -986,92 +1023,18 @@ impl TreeNodeRewriter for ProjectionAdder { child.clone() }; let mut expressions = node.expressions(); - // for expr in &added_expr{ - // println!("expr.display_name() {:?}", expr.display_name()?); - // } - // println!(" original expressions: {:?}", expressions); let available_columns = child.schema().fields().iter().map(|field| field.qualified_column()).collect::>(); // Replace expressions with its pre-computed variant if available. expressions.iter_mut().try_for_each(|expr|{ - update_expr_with_available_columns(expr, &available_columns) + self.update_expr_with_available_columns(expr, &available_columns) })?; - // println!("after update expressions: {:?}", expressions); let new_node = node.with_new_exprs(expressions, [child].to_vec())?; - // println!("new node: {:?}", new_node); Ok(Transformed::yes( new_node, )) - - // match node { - // // do not add for Projections - // LogicalPlan::Projection(_) - // | LogicalPlan::TableScan(_) - // | LogicalPlan::Join(_) => Ok(Transformed::no(node)), - // _ => { - // // avoid recursive add projections - // if added_expr.iter().any(|expr| { - // node.inputs()[0] - // .schema() - // .has_column_with_unqualified_name(&expr.to_string()) - // }) { - // return Ok(Transformed::no(node)); - // } - // - // let mut field_set = HashSet::new(); - // let mut project_exprs = vec![]; - // for expr in added_expr { - // let f = DFField::new_unqualified( - // &expr.to_string(), - // self.data_type_map[&expr].clone(), - // true, - // ); - // field_set.insert(f.name().to_owned()); - // project_exprs.push(expr.clone().alias(expr.to_string())); - // } - // for field in node.inputs()[0].schema().fields() { - // if field_set.insert(field.qualified_name()) { - // project_exprs.push(Expr::Column(field.qualified_column())); - // } - // } - // // adding new plan here - // let new_plan = LogicalPlan::Projection(Projection::try_new( - // project_exprs, - // Arc::new(node.inputs()[0].clone()), - // )?); - // Ok(Transformed::yes( - // node.with_new_exprs(node.expressions(), [new_plan].to_vec())?, - // )) - // } - // } } } - -fn update_expr_with_available_columns(expr: &mut Expr, available_columns: &[Column]) -> Result<()> { - match expr { - Expr::BinaryExpr(_) => { - for available_col in available_columns{ - // println!("cached_expr.display_name(), expr.display_name() {:?}, {:?}", cached_expr.display_name()?, expr.display_name()?); - if available_col.flat_name() == expr.display_name()?{ - // println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); - *expr = Expr::Column(available_col.clone()); - } - } - }, - Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { - args.iter_mut().try_for_each(|arg| update_expr_with_available_columns(arg, available_columns))? - }, - Expr::Cast(Cast{ expr, .. }) => { - update_expr_with_available_columns(expr, available_columns)? - } - _ => { - // println!("unsupported expression : {:?}", expr); - // cannot rewrite - } - } - Ok(()) -} - #[cfg(test)] mod test { use std::iter; From b77efabd7ddba2540e2b22e6549686175c933a73 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 11:24:07 +0300 Subject: [PATCH 04/12] Tmp --- .../src/physical_optimizer/enforce_sorting.rs | 25 ++++---- .../tests/data/project_complex_expression.csv | 6 ++ .../optimizer/src/common_subexpr_eliminate.rs | 27 +++++++- .../optimizer/src/optimize_projections.rs | 1 + .../test_files/project_complex_sub_query.slt | 63 ++++++++++++++++--- 5 files changed, 100 insertions(+), 22 deletions(-) create mode 100644 datafusion/core/tests/data/project_complex_expression.csv diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 95e4a06b516b..1c4a453d2c25 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2367,18 +2367,16 @@ mod tmp_tests { Ok(()) } - const MULTIPLE_ORDERED_TABLE: &str = "CREATE EXTERNAL TABLE multiple_ordered_table ( - a0 INTEGER, - a INTEGER, - b INTEGER, - c INTEGER, - d INTEGER - ) - STORED AS CSV - WITH HEADER ROW - WITH ORDER (a ASC, b ASC) - WITH ORDER (c ASC) - LOCATION '../core/tests/data/window_2.csv'"; + const MULTIPLE_ORDERED_TABLE: &str = "CREATE EXTERNAL TABLE t ( +c1 INT NOT NULL, +c2 INT NOT NULL, +c3 INT NOT NULL, +c4 INT NOT NULL, +c5 INT NOT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION '../core/tests/data/project_complex_expression.csv'"; #[tokio::test] async fn test_query() -> Result<()> { @@ -2387,7 +2385,8 @@ mod tmp_tests { ctx.sql(MULTIPLE_ORDERED_TABLE).await?; - let sql = "SELECT a+b, SUM(a+b) OVER() FROM multiple_ordered_table"; + let sql = "SELECT c3+c4, SUM(c3+c4) OVER(order by c3+c4) +FROM t"; let msg = format!("Creating logical plan for '{sql}'"); let dataframe = ctx.sql(sql).await.expect(&msg); diff --git a/datafusion/core/tests/data/project_complex_expression.csv b/datafusion/core/tests/data/project_complex_expression.csv new file mode 100644 index 000000000000..f37f11cc7f1c --- /dev/null +++ b/datafusion/core/tests/data/project_complex_expression.csv @@ -0,0 +1,6 @@ +c1,c2,c3,c4,c5 +1,2,3,4,5 +6,7,8,9,10 +11,12,13,14,15 +16,17,18,19,20 +21,22,23,24,25 diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index bab58b5880e7..987618513642 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -893,8 +893,13 @@ impl ProjectionAdder { Self::get_complex_expressions(vec![*expr], schema.clone()); res.extend(expr_set); expr_data_type.extend(type_data_map); + }, + Expr::Alias(Alias{expr, relation: _, name: _}) => { + let (expr_set, type_data_map) = + Self::get_complex_expressions(vec![*expr], schema.clone()); + res.extend(expr_set); + expr_data_type.extend(type_data_map); } - Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { let (expr_set, type_map) = Self::get_complex_expressions(args, schema.clone()); @@ -923,14 +928,31 @@ impl ProjectionAdder { }, Expr::Cast(Cast{ expr, .. }) => { self.update_expr_with_available_columns(expr, available_columns)? + }, + Expr::Alias(alias) => { + self.update_expr_with_available_columns(&mut alias.expr, available_columns)?; } _ => { // cannot rewrite } } + Self::trim_expr(expr)?; Ok(()) } + fn trim_expr(expr: &mut Expr) -> Result<()> { + let orig_name = expr.display_name()?; + match expr { + Expr::Alias(alias) => { + Self::trim_expr(&mut alias.expr)?; + if orig_name == alias.expr.display_name()?{ + *expr = *alias.expr.clone(); + } + Ok(()) + } + _ => Ok(()), + } + } // Assumes operators doesn't modify name of the fields. // Otherwise this operation is not safe. @@ -966,6 +988,7 @@ impl TreeNodeRewriter for ProjectionAdder { Ok(Transformed::no(node)) } LogicalPlan::Projection(_) => { + // println!("proj node exprs: {:?}", node.expressions()); if depth_set.is_empty(){ self.extend_with_exprs(&node); } else { @@ -988,7 +1011,7 @@ impl TreeNodeRewriter for ProjectionAdder { // do not do extra things let should_add_projection = !added_expr.is_empty(); if !should_add_projection && !self.should_update_parents { - // println!("not updating node"); + // clearln!("not updating node"); return Ok(Transformed::no(node)); } diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 39dc56d2af93..9201d0a5e9b3 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -174,6 +174,7 @@ fn optimize_projections( return Ok(None); } LogicalPlan::Projection(proj) => { + // println!("expr:{:?}", proj.expr); return if let Some(proj) = merge_consecutive_projections(proj)? { Ok(Some( rewrite_projection_given_requirements(&proj, config, indices)? diff --git a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt index 30a84f1fc397..06bfaf528921 100644 --- a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt +++ b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt @@ -29,10 +29,7 @@ c5 INT NOT NULL ) STORED AS CSV WITH HEADER ROW -LOCATION '../core/tests/data/demo.csv'; - -statement ok -set datafusion.explain.logical_plan_only = true; +LOCATION '../core/tests/data/project_complex_expression.csv'; query TT explain SELECT c3+c4, SUM(c3+c4) OVER() @@ -42,6 +39,12 @@ logical_plan WindowAggr: windowExpr=[[SUM(CAST(t.c3 + t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] --Projection: t.c3 + t.c4 AS t.c3 + t.c4 ----TableScan: t projection=[c3, c4] +physical_plan +WindowAggExec: wdw=[SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] +--CoalescePartitionsExec +----ProjectionExec: expr=[c3@0 + c4@1 as t.c3 + t.c4] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/project_complex_expression.csv]]}, projection=[c3, c4], has_header=true query TT explain SELECT c3+c4, SUM(c3+c4) OVER(order by c3+c4) @@ -52,6 +55,15 @@ Projection: t.c3 + t.c4, SUM(t.c3 + t.c4) ORDER BY [t.c3 + t.c4 ASC NULLS LAST] --WindowAggr: windowExpr=[[SUM(CAST(t.c3 + t.c4t.c4t.c3 AS t.c3 + t.c4 AS Int64)) ORDER BY [t.c3 + t.c4t.c4t.c3 AS t.c3 + t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(t.c3 + t.c4) ORDER BY [t.c3 + t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] ----Projection: t.c3 + t.c4 AS t.c3 + t.c4t.c4t.c3, t.c3, t.c4 ------TableScan: t projection=[c3, c4] +physical_plan +ProjectionExec: expr=[c3@1 + c4@2 as t.c3 + t.c4, SUM(t.c3 + t.c4) ORDER BY [t.c3 + t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(t.c3 + t.c4) ORDER BY [t.c3 + t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----BoundedWindowAggExec: wdw=[SUM(t.c3 + t.c4) ORDER BY [t.c3 + t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(t.c3 + t.c4) ORDER BY [t.c3 + t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted] +------SortPreservingMergeExec: [t.c3 + t.c4t.c4t.c3@0 ASC NULLS LAST] +--------SortExec: expr=[t.c3 + t.c4t.c4t.c3@0 ASC NULLS LAST] +----------ProjectionExec: expr=[c3@0 + c4@1 as t.c3 + t.c4t.c4t.c3, c3@0 as c3, c4@1 as c4] +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/project_complex_expression.csv]]}, projection=[c3, c4], has_header=true query TT explain SELECT c3-c4, SUM(c3-c4) OVER(order by c3-c4) @@ -62,12 +74,49 @@ Projection: t.c3 - t.c4, SUM(t.c3 - t.c4) ORDER BY [t.c3 - t.c4 ASC NULLS LAST] --WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4t.c4t.c3 AS t.c3 - t.c4 AS Int64)) ORDER BY [t.c3 - t.c4t.c4t.c3 AS t.c3 - t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(t.c3 - t.c4) ORDER BY [t.c3 - t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] ----Projection: t.c3 - t.c4 AS t.c3 - t.c4t.c4t.c3, t.c3, t.c4 ------TableScan: t projection=[c3, c4] +physical_plan +ProjectionExec: expr=[c3@1 - c4@2 as t.c3 - t.c4, SUM(t.c3 - t.c4) ORDER BY [t.c3 - t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(t.c3 - t.c4) ORDER BY [t.c3 - t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----BoundedWindowAggExec: wdw=[SUM(t.c3 - t.c4) ORDER BY [t.c3 - t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(t.c3 - t.c4) ORDER BY [t.c3 - t.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted] +------SortPreservingMergeExec: [t.c3 - t.c4t.c4t.c3@0 ASC NULLS LAST] +--------SortExec: expr=[t.c3 - t.c4t.c4t.c3@0 ASC NULLS LAST] +----------ProjectionExec: expr=[c3@0 - c4@1 as t.c3 - t.c4t.c4t.c3, c3@0 as c3, c4@1 as c4] +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/project_complex_expression.csv]]}, projection=[c3, c4], has_header=true query TT explain SELECT c3-c4, SUM(c3-c4) OVER() FROM t; ---- logical_plan -WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ---Projection: t.c3 - t.c4 AS t.c3 - t.c4 -----TableScan: t projection=[c3, c4] +Projection: t.c3 - t.c4, SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING +--WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +----Projection: t.c3 - t.c4 AS t.c3 - t.c4, t.c3, t.c4 +------TableScan: t projection=[c3, c4] +physical_plan +ProjectionExec: expr=[t.c3 - t.c4@0 as t.c3 - t.c4, SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] +--WindowAggExec: wdw=[SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] +----CoalescePartitionsExec +------ProjectionExec: expr=[c3@0 - c4@1 as t.c3 - t.c4, c3@0 as c3, c4@1 as c4] +--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/project_complex_expression.csv]]}, projection=[c3, c4], has_header=true + +query II +SELECT c3+c4, SUM(c3+c4) OVER() +FROM t; +---- +7 135 +17 135 +27 135 +37 135 +47 135 + +query II +SELECT c3+c4, SUM(c3+c4) OVER(order by c3+c4) +FROM t +---- +7 7 +17 24 +27 51 +37 88 +47 135 From 59e0385dc0f8ac47d6a2a0e6d9205eb85e5b75ca Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 13:25:53 +0300 Subject: [PATCH 05/12] slt tests pass --- .../src/physical_optimizer/enforce_sorting.rs | 3 +- datafusion/expr/src/expr.rs | 15 +- .../optimizer/src/common_subexpr_eliminate.rs | 186 +++++++++++------- datafusion/sqllogictest/test_files/insert.slt | 2 +- .../test_files/project_complex_sub_query.slt | 20 +- .../sqllogictest/test_files/subquery.slt | 2 +- 6 files changed, 137 insertions(+), 91 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 1c4a453d2c25..63f35b70b4bf 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2348,7 +2348,6 @@ mod tests { } } - #[cfg(test)] mod tmp_tests { use crate::assert_batches_eq; @@ -2385,7 +2384,7 @@ LOCATION '../core/tests/data/project_complex_expression.csv'"; ctx.sql(MULTIPLE_ORDERED_TABLE).await?; - let sql = "SELECT c3+c4, SUM(c3+c4) OVER(order by c3+c4) + let sql = "SELECT c3+c4, SUM(c3+c4) OVER() FROM t"; let msg = format!("Creating logical plan for '{sql}'"); diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 7ede4cd8ffc9..2e4930a1a0cb 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1866,8 +1866,19 @@ fn create_name(e: &Expr) -> Result { Ok(format!("{expr} BETWEEN {low} AND {high}")) } } - Expr::Sort { .. } => { - internal_err!("Create name does not support sort expression") + Expr::Sort(Sort { + expr, + asc, + nulls_first, + }) => { + let dir = if *asc { "ASC" } else { "DESC" }; + let nulls = if *nulls_first { + "NULLS_FIRST" + } else { + "NULLS_LAST" + }; + Ok(format!("{expr} {dir} {nulls}")) + // internal_err!("Create name does not support sort expression") } Expr::Wildcard { qualifier } => match qualifier { Some(qualifier) => internal_err!( diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 987618513642..51e704d30ecc 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -420,15 +420,18 @@ impl CommonSubexprEliminate { /// currently the implemention is not optimal, Basically I just do a top-down iteration over all the /// fn add_extra_projection(&self, plan: &LogicalPlan) -> Result> { - plan.clone() + // println!("plan at the start: {:?}", plan); + let res = plan + .clone() .rewrite(&mut ProjectionAdder { - data_type_map: HashMap::new(), - cumulative_expr_map: HashMap::new(), - cumulative_map: HashSet::new(), + insertion_point_map: HashMap::new(), depth: 0, - should_update_parents: false, - }) - .map(|transformed| Some(transformed.data)) + complex_exprs: HashMap::new(), + })? + .data; + // .map(|transformed| Some(transformed.data)) + // println!("plan at the end: {:?}", res); + Ok(Some(res)) } } impl OptimizerRule for CommonSubexprEliminate { @@ -850,11 +853,10 @@ fn replace_common_expr( } struct ProjectionAdder { - cumulative_expr_map: HashMap>, - cumulative_map: HashSet, - depth: u128, - data_type_map: HashMap, - should_update_parents: bool, + insertion_point_map: HashMap>, + depth: usize, + complex_exprs: HashMap, + // should_update_parents: bool, } pub fn is_not_complex(op: &Operator) -> bool { matches!( @@ -862,14 +864,14 @@ pub fn is_not_complex(op: &Operator) -> bool { &Operator::Eq | &Operator::NotEq | &Operator::Lt | &Operator::Gt | &Operator::And ) } + impl ProjectionAdder { // TODO: adding more expressions for sub query, currently only support for Simple Binary Expressions fn get_complex_expressions( exprs: Vec, schema: DFSchemaRef, - ) -> (HashSet, HashMap) { + ) -> HashSet<(Expr, DataType)> { let mut res = HashSet::new(); - let mut expr_data_type: HashMap = HashMap::new(); for expr in exprs { match expr { Expr::BinaryExpr(BinaryExpr { @@ -881,56 +883,62 @@ impl ProjectionAdder { let l_field = schema .field_from_column(l) .expect("Field not found for left column"); - - expr_data_type - .entry(expr.clone()) - .or_insert(l_field.data_type().clone()); - res.insert(expr.clone()); + res.insert((expr.clone(), l_field.data_type().clone())); } } Expr::Cast(Cast { expr, data_type: _ }) => { - let (expr_set, type_data_map) = + let exprs_with_type = Self::get_complex_expressions(vec![*expr], schema.clone()); - res.extend(expr_set); - expr_data_type.extend(type_data_map); - }, - Expr::Alias(Alias{expr, relation: _, name: _}) => { - let (expr_set, type_data_map) = + res.extend(exprs_with_type); + } + Expr::Alias(Alias { + expr, + relation: _, + name: _, + }) => { + let exprs_with_type = Self::get_complex_expressions(vec![*expr], schema.clone()); - res.extend(expr_set); - expr_data_type.extend(type_data_map); + res.extend(exprs_with_type); } Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { - let (expr_set, type_map) = + let exprs_with_type = Self::get_complex_expressions(args, schema.clone()); - res.extend(expr_set); - expr_data_type.extend(type_map); + res.extend(exprs_with_type); } _ => {} } } - (res, expr_data_type) + res } - fn update_expr_with_available_columns(&self, expr: &mut Expr, available_columns: &[Column]) -> Result<()> { + fn update_expr_with_available_columns( + &self, + expr: &mut Expr, + available_columns: &[Column], + ) -> Result<()> { match expr { Expr::BinaryExpr(_) => { - for available_col in available_columns{ + for available_col in available_columns { // println!("cached_expr.display_name(), expr.display_name() {:?}, {:?}", cached_expr.display_name()?, expr.display_name()?); - if available_col.flat_name() == expr.display_name()?{ + if available_col.flat_name() == expr.display_name()? { // println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); *expr = Expr::Column(available_col.clone()); } } - }, + } Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { - args.iter_mut().try_for_each(|arg| self.update_expr_with_available_columns(arg, available_columns))? - }, - Expr::Cast(Cast{ expr, .. }) => { + args.iter_mut().try_for_each(|arg| { + self.update_expr_with_available_columns(arg, available_columns) + })? + } + Expr::Cast(Cast { expr, .. }) => { self.update_expr_with_available_columns(expr, available_columns)? - }, + } Expr::Alias(alias) => { - self.update_expr_with_available_columns(&mut alias.expr, available_columns)?; + self.update_expr_with_available_columns( + &mut alias.expr, + available_columns, + )?; } _ => { // cannot rewrite @@ -945,7 +953,7 @@ impl ProjectionAdder { match expr { Expr::Alias(alias) => { Self::trim_expr(&mut alias.expr)?; - if orig_name == alias.expr.display_name()?{ + if orig_name == alias.expr.display_name()? { *expr = *alias.expr.clone(); } Ok(()) @@ -960,14 +968,18 @@ impl ProjectionAdder { // use depth to trace where we are in the LogicalPlan tree // extract all expressions + check whether it contains in depth_sets let exprs = node.expressions(); + // println!("extended exprs: {:?}", exprs); let mut schema = node.schema().deref().clone(); for ip in node.inputs() { schema.merge(ip.schema()); } - let (extended_set, data_map) = - Self::get_complex_expressions(exprs, Arc::new(schema)); - self.cumulative_map.extend(extended_set); - self.data_type_map.extend(data_map); + let expr_with_type = Self::get_complex_expressions(exprs, Arc::new(schema)); + for (expr, dtype) in expr_with_type { + let (_, count) = self.complex_exprs.entry(expr).or_insert_with(|| (dtype, 0)); + *count += 1; + } + // self.cumulative_map.extend(extended_set); + // self.data_type_map.extend(data_map); } } impl TreeNodeRewriter for ProjectionAdder { @@ -975,59 +987,72 @@ impl TreeNodeRewriter for ProjectionAdder { /// currently we just collect the complex bianryOP fn f_down(&mut self, node: Self::Node) -> Result> { - let depth_set = &mut self.cumulative_map; // Insert for other end points self.depth += 1; match node { LogicalPlan::TableScan(_) => { - self.cumulative_expr_map.insert(self.depth-1, depth_set.clone()); + self.insertion_point_map + .insert(self.depth - 1, self.complex_exprs.clone()); return Ok(Transformed::no(node)); - }, + } LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) => { self.extend_with_exprs(&node); Ok(Transformed::no(node)) } LogicalPlan::Projection(_) => { // println!("proj node exprs: {:?}", node.expressions()); - if depth_set.is_empty(){ + if self.complex_exprs.is_empty() { self.extend_with_exprs(&node); } else { - self.cumulative_expr_map.insert(self.depth-1, depth_set.clone()); + self.insertion_point_map + .insert(self.depth - 1, self.complex_exprs.clone()); } Ok(Transformed::no(node)) - }, + } _ => { // Unsupported operators - self.cumulative_map.clear(); + self.complex_exprs.clear(); Ok(Transformed::no(node)) } } } fn f_up(&mut self, node: Self::Node) -> Result> { - let added_expr = - self.cumulative_expr_map.get(&self.depth).cloned().unwrap_or_default(); + let cached_exprs = self + .insertion_point_map + .get(&self.depth) + .cloned() + .unwrap_or_default(); self.depth -= 1; // do not do extra things - let should_add_projection = !added_expr.is_empty(); - if !should_add_projection && !self.should_update_parents { - // clearln!("not updating node"); + // println!("---------------------"); + // for (depth, complex_exprs) in &self.insertion_point_map{ + // println!("depth: {:?}, complex_exprs: {:?}", depth, complex_exprs); + // } + // println!("---------------------"); + // let should_add_projection = !cached_exprs.is_empty(); + let should_add_projection = + cached_exprs.iter().any(|(_expr, (_, count))| *count > 1); + // if let LogicalPlan::Projection(projection) = node { + // + // } + + let children = node.inputs(); + if children.len() != 1 { + // Only can rewrite node with single child return Ok(Transformed::no(node)); } - - let child = node.inputs()[0]; + let child = children[0].clone(); let child = if should_add_projection { - self.should_update_parents = true; let mut field_set = HashSet::new(); let mut project_exprs = vec![]; - for expr in &added_expr { - let f = DFField::new_unqualified( - &expr.to_string(), - self.data_type_map[expr].clone(), - true, - ); - field_set.insert(f.name().to_owned()); - project_exprs.push(expr.clone().alias(expr.to_string())); + for (expr, (dtype, count)) in &cached_exprs { + if *count > 1 { + let f = + DFField::new_unqualified(&expr.to_string(), dtype.clone(), true); + field_set.insert(f.name().to_owned()); + project_exprs.push(expr.clone().alias(expr.to_string())); + } } // Do not lose fields in the child. for field in child.schema().fields() { @@ -1043,19 +1068,32 @@ impl TreeNodeRewriter for ProjectionAdder { )?); new_child } else { - child.clone() + child }; let mut expressions = node.expressions(); - let available_columns = child.schema().fields().iter().map(|field| field.qualified_column()).collect::>(); + let available_columns = child + .schema() + .fields() + .iter() + .map(|field| field.qualified_column()) + .collect::>(); // Replace expressions with its pre-computed variant if available. - expressions.iter_mut().try_for_each(|expr|{ + // println!("-----------------------"); + // for expr in &expressions{ + // println!("old expr: {:?}", expr); + // } + expressions.iter_mut().try_for_each(|expr| { self.update_expr_with_available_columns(expr, &available_columns) })?; + // for expr in &expressions{ + // println!("new expr: {:?}", expr); + // } + // println!("-----------------------"); + // println!("old node:{:?}", node); let new_node = node.with_new_exprs(expressions, [child].to_vec())?; - Ok(Transformed::yes( - new_node, - )) + // println!("new node:{:?}", new_node); + Ok(Transformed::yes(new_node)) } } #[cfg(test)] diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index fc272326973b..113e544ad063 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -169,7 +169,7 @@ ORDER BY c1 ---- logical_plan Dml: op=[Insert Into] table=[table_without_values] ---Projection: a1 AS a1, a2 AS a2 +--Projection: a1, a2 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1 --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] diff --git a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt index 06bfaf528921..9f88de66ea3b 100644 --- a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt +++ b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt @@ -37,7 +37,7 @@ FROM t; ---- logical_plan WindowAggr: windowExpr=[[SUM(CAST(t.c3 + t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ---Projection: t.c3 + t.c4 AS t.c3 + t.c4 +--Projection: t.c3 + t.c4 ----TableScan: t projection=[c3, c4] physical_plan WindowAggExec: wdw=[SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] @@ -89,17 +89,15 @@ explain SELECT c3-c4, SUM(c3-c4) OVER() FROM t; ---- logical_plan -Projection: t.c3 - t.c4, SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ---WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] -----Projection: t.c3 - t.c4 AS t.c3 - t.c4, t.c3, t.c4 -------TableScan: t projection=[c3, c4] +WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +--Projection: t.c3 - t.c4 +----TableScan: t projection=[c3, c4] physical_plan -ProjectionExec: expr=[t.c3 - t.c4@0 as t.c3 - t.c4, SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] ---WindowAggExec: wdw=[SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] -----CoalescePartitionsExec -------ProjectionExec: expr=[c3@0 - c4@1 as t.c3 - t.c4, c3@0 as c3, c4@1 as c4] ---------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/project_complex_expression.csv]]}, projection=[c3, c4], has_header=true +WindowAggExec: wdw=[SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] +--CoalescePartitionsExec +----ProjectionExec: expr=[c3@0 - c4@1 as t.c3 - t.c4] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/project_complex_expression.csv]]}, projection=[c3, c4], has_header=true query II SELECT c3+c4, SUM(c3+c4) OVER() diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 4fb94cfab523..aac64b55b535 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -449,7 +449,7 @@ query TT explain SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1) as t2_int from t1 ---- logical_plan -Projection: t1.t1_id, () AS t2_int +Projection: t1.t1_id, () --Subquery: ----Limit: skip=0, fetch=1 ------Projection: t2.t2_int From c1a16f9018af639be57db04e05f766718c5ce134 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 13:29:15 +0300 Subject: [PATCH 06/12] Remove unnecessary test --- .../src/physical_optimizer/enforce_sorting.rs | 75 ------------------- 1 file changed, 75 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 63f35b70b4bf..5bf21c3dfab5 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2347,78 +2347,3 @@ mod tests { Ok(()) } } - -#[cfg(test)] -mod tmp_tests { - use crate::assert_batches_eq; - use crate::physical_plan::{collect, displayable, ExecutionPlan}; - use crate::prelude::SessionContext; - use arrow::util::pretty::print_batches; - use datafusion_common::Result; - use datafusion_execution::config::SessionConfig; - use datafusion_physical_plan::get_plan_string; - use std::sync::Arc; - - fn print_plan(plan: &Arc) -> Result<()> { - let formatted = displayable(plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - println!("{:#?}", actual); - Ok(()) - } - - const MULTIPLE_ORDERED_TABLE: &str = "CREATE EXTERNAL TABLE t ( -c1 INT NOT NULL, -c2 INT NOT NULL, -c3 INT NOT NULL, -c4 INT NOT NULL, -c5 INT NOT NULL -) -STORED AS CSV -WITH HEADER ROW -LOCATION '../core/tests/data/project_complex_expression.csv'"; - - #[tokio::test] - async fn test_query() -> Result<()> { - let config = SessionConfig::new().with_target_partitions(1); - let ctx = SessionContext::new_with_config(config); - - ctx.sql(MULTIPLE_ORDERED_TABLE).await?; - - let sql = "SELECT c3+c4, SUM(c3+c4) OVER() -FROM t"; - - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - print_plan(&physical_plan)?; - let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?; - print_batches(&batches)?; - - let expected = vec![ - "ProjectionExec: expr=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as nth_value1]", - " GlobalLimitExec: skip=0, fetch=5", - " BoundedWindowAggExec: wdw=[NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"NTH_VALUE(annotated_data_finite2.d,Int64(2)) ORDER BY [annotated_data_finite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]", - " CsvExec: file_groups={1 group: [[Users/akurmustafa/projects/synnada/arrow-datafusion-synnada/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", - ]; - // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = [ - "+------------+", - "| nth_value1 |", - "+------------+", - "| 2 |", - "| 2 |", - "| 2 |", - "| 2 |", - "| 2 |", - "+------------+", - ]; - assert_batches_eq!(expected, &batches); - Ok(()) - } -} From fb9e4d68e0167f0a7845fc3ceb423f83e2c960ec Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 13:58:20 +0300 Subject: [PATCH 07/12] Remove unnecessary changes --- datafusion/optimizer/src/common_subexpr_eliminate.rs | 2 +- datafusion/sqllogictest/test_files/insert.slt | 2 +- .../sqllogictest/test_files/project_complex_sub_query.slt | 4 ++-- datafusion/sqllogictest/test_files/subquery.slt | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 51e704d30ecc..dcb6f23e237f 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -944,7 +944,7 @@ impl ProjectionAdder { // cannot rewrite } } - Self::trim_expr(expr)?; + // Self::trim_expr(expr)?; Ok(()) } diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 113e544ad063..fc272326973b 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -169,7 +169,7 @@ ORDER BY c1 ---- logical_plan Dml: op=[Insert Into] table=[table_without_values] ---Projection: a1, a2 +--Projection: a1 AS a1, a2 AS a2 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1 --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] diff --git a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt index 9f88de66ea3b..56e728325b79 100644 --- a/datafusion/sqllogictest/test_files/project_complex_sub_query.slt +++ b/datafusion/sqllogictest/test_files/project_complex_sub_query.slt @@ -37,7 +37,7 @@ FROM t; ---- logical_plan WindowAggr: windowExpr=[[SUM(CAST(t.c3 + t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ---Projection: t.c3 + t.c4 +--Projection: t.c3 + t.c4 AS t.c3 + t.c4 ----TableScan: t projection=[c3, c4] physical_plan WindowAggExec: wdw=[SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 + t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] @@ -90,7 +90,7 @@ FROM t; ---- logical_plan WindowAggr: windowExpr=[[SUM(CAST(t.c3 - t.c4 AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ---Projection: t.c3 - t.c4 +--Projection: t.c3 - t.c4 AS t.c3 - t.c4 ----TableScan: t projection=[c3, c4] physical_plan WindowAggExec: wdw=[SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t.c3 - t.c4) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index aac64b55b535..4fb94cfab523 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -449,7 +449,7 @@ query TT explain SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1) as t2_int from t1 ---- logical_plan -Projection: t1.t1_id, () +Projection: t1.t1_id, () AS t2_int --Subquery: ----Limit: skip=0, fetch=1 ------Projection: t2.t2_int From bdf626ad2f11973b7ae1a3a06ed121ad630c39a1 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 14:03:46 +0300 Subject: [PATCH 08/12] Resolve linter errors --- .../optimizer/src/common_subexpr_eliminate.rs | 64 +++---------------- 1 file changed, 9 insertions(+), 55 deletions(-) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index dcb6f23e237f..b6c51b8f8a4c 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -420,8 +420,7 @@ impl CommonSubexprEliminate { /// currently the implemention is not optimal, Basically I just do a top-down iteration over all the /// fn add_extra_projection(&self, plan: &LogicalPlan) -> Result> { - // println!("plan at the start: {:?}", plan); - let res = plan + let result = plan .clone() .rewrite(&mut ProjectionAdder { insertion_point_map: HashMap::new(), @@ -429,9 +428,7 @@ impl CommonSubexprEliminate { complex_exprs: HashMap::new(), })? .data; - // .map(|transformed| Some(transformed.data)) - // println!("plan at the end: {:?}", res); - Ok(Some(res)) + Ok(Some(result)) } } impl OptimizerRule for CommonSubexprEliminate { @@ -912,30 +909,27 @@ impl ProjectionAdder { } fn update_expr_with_available_columns( - &self, expr: &mut Expr, available_columns: &[Column], ) -> Result<()> { match expr { Expr::BinaryExpr(_) => { for available_col in available_columns { - // println!("cached_expr.display_name(), expr.display_name() {:?}, {:?}", cached_expr.display_name()?, expr.display_name()?); if available_col.flat_name() == expr.display_name()? { - // println!("replacing expr: {:?} with available_col: {:?}", expr, available_col); *expr = Expr::Column(available_col.clone()); } } } Expr::WindowFunction(WindowFunction { fun: _, args, .. }) => { args.iter_mut().try_for_each(|arg| { - self.update_expr_with_available_columns(arg, available_columns) + Self::update_expr_with_available_columns(arg, available_columns) })? } Expr::Cast(Cast { expr, .. }) => { - self.update_expr_with_available_columns(expr, available_columns)? + Self::update_expr_with_available_columns(expr, available_columns)? } Expr::Alias(alias) => { - self.update_expr_with_available_columns( + Self::update_expr_with_available_columns( &mut alias.expr, available_columns, )?; @@ -944,31 +938,15 @@ impl ProjectionAdder { // cannot rewrite } } - // Self::trim_expr(expr)?; Ok(()) } - fn trim_expr(expr: &mut Expr) -> Result<()> { - let orig_name = expr.display_name()?; - match expr { - Expr::Alias(alias) => { - Self::trim_expr(&mut alias.expr)?; - if orig_name == alias.expr.display_name()? { - *expr = *alias.expr.clone(); - } - Ok(()) - } - _ => Ok(()), - } - } - // Assumes operators doesn't modify name of the fields. // Otherwise this operation is not safe. fn extend_with_exprs(&mut self, node: &LogicalPlan) { // use depth to trace where we are in the LogicalPlan tree // extract all expressions + check whether it contains in depth_sets let exprs = node.expressions(); - // println!("extended exprs: {:?}", exprs); let mut schema = node.schema().deref().clone(); for ip in node.inputs() { schema.merge(ip.schema()); @@ -978,8 +956,6 @@ impl ProjectionAdder { let (_, count) = self.complex_exprs.entry(expr).or_insert_with(|| (dtype, 0)); *count += 1; } - // self.cumulative_map.extend(extended_set); - // self.data_type_map.extend(data_map); } } impl TreeNodeRewriter for ProjectionAdder { @@ -993,14 +969,13 @@ impl TreeNodeRewriter for ProjectionAdder { LogicalPlan::TableScan(_) => { self.insertion_point_map .insert(self.depth - 1, self.complex_exprs.clone()); - return Ok(Transformed::no(node)); + Ok(Transformed::no(node)) } LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) => { self.extend_with_exprs(&node); Ok(Transformed::no(node)) } LogicalPlan::Projection(_) => { - // println!("proj node exprs: {:?}", node.expressions()); if self.complex_exprs.is_empty() { self.extend_with_exprs(&node); } else { @@ -1025,17 +1000,8 @@ impl TreeNodeRewriter for ProjectionAdder { .unwrap_or_default(); self.depth -= 1; // do not do extra things - // println!("---------------------"); - // for (depth, complex_exprs) in &self.insertion_point_map{ - // println!("depth: {:?}, complex_exprs: {:?}", depth, complex_exprs); - // } - // println!("---------------------"); - // let should_add_projection = !cached_exprs.is_empty(); let should_add_projection = cached_exprs.iter().any(|(_expr, (_, count))| *count > 1); - // if let LogicalPlan::Projection(projection) = node { - // - // } let children = node.inputs(); if children.len() != 1 { @@ -1062,11 +1028,10 @@ impl TreeNodeRewriter for ProjectionAdder { } // adding new plan here - let new_child = LogicalPlan::Projection(Projection::try_new( + LogicalPlan::Projection(Projection::try_new( project_exprs, Arc::new(node.inputs()[0].clone()), - )?); - new_child + )?) } else { child }; @@ -1078,21 +1043,10 @@ impl TreeNodeRewriter for ProjectionAdder { .map(|field| field.qualified_column()) .collect::>(); // Replace expressions with its pre-computed variant if available. - // println!("-----------------------"); - // for expr in &expressions{ - // println!("old expr: {:?}", expr); - // } expressions.iter_mut().try_for_each(|expr| { - self.update_expr_with_available_columns(expr, &available_columns) + Self::update_expr_with_available_columns(expr, &available_columns) })?; - // for expr in &expressions{ - // println!("new expr: {:?}", expr); - // } - // println!("-----------------------"); - - // println!("old node:{:?}", node); let new_node = node.with_new_exprs(expressions, [child].to_vec())?; - // println!("new node:{:?}", new_node); Ok(Transformed::yes(new_node)) } } From 31cc2d7d0052418b06389d2cf6f105c284267766 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 14:20:37 +0300 Subject: [PATCH 09/12] Remove left over file --- datafusion/core/tests/data/demo.csv | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 datafusion/core/tests/data/demo.csv diff --git a/datafusion/core/tests/data/demo.csv b/datafusion/core/tests/data/demo.csv deleted file mode 100644 index d1492228246f..000000000000 --- a/datafusion/core/tests/data/demo.csv +++ /dev/null @@ -1,2 +0,0 @@ -c1,c2,c3,c4,c5 -1,2,3,4,5 From e7f4b49694803bb41be2500c8003cc65a2b91363 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 14:23:17 +0300 Subject: [PATCH 10/12] Minor changes --- datafusion/core/tests/data/project_complex_expression.csv | 2 +- datafusion/optimizer/src/optimize_projections.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/data/project_complex_expression.csv b/datafusion/core/tests/data/project_complex_expression.csv index f37f11cc7f1c..c78f35a485cd 100644 --- a/datafusion/core/tests/data/project_complex_expression.csv +++ b/datafusion/core/tests/data/project_complex_expression.csv @@ -3,4 +3,4 @@ c1,c2,c3,c4,c5 6,7,8,9,10 11,12,13,14,15 16,17,18,19,20 -21,22,23,24,25 +21,22,23,24,25 \ No newline at end of file diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 9201d0a5e9b3..39dc56d2af93 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -174,7 +174,6 @@ fn optimize_projections( return Ok(None); } LogicalPlan::Projection(proj) => { - // println!("expr:{:?}", proj.expr); return if let Some(proj) = merge_consecutive_projections(proj)? { Ok(Some( rewrite_projection_given_requirements(&proj, config, indices)? From 313cccd78cc782217daed56f6a27baf12bb101ed Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 14:27:05 +0300 Subject: [PATCH 11/12] Remove unnecessary changes --- datafusion/expr/src/expr.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 2e4930a1a0cb..7ede4cd8ffc9 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1866,19 +1866,8 @@ fn create_name(e: &Expr) -> Result { Ok(format!("{expr} BETWEEN {low} AND {high}")) } } - Expr::Sort(Sort { - expr, - asc, - nulls_first, - }) => { - let dir = if *asc { "ASC" } else { "DESC" }; - let nulls = if *nulls_first { - "NULLS_FIRST" - } else { - "NULLS_LAST" - }; - Ok(format!("{expr} {dir} {nulls}")) - // internal_err!("Create name does not support sort expression") + Expr::Sort { .. } => { + internal_err!("Create name does not support sort expression") } Expr::Wildcard { qualifier } => match qualifier { Some(qualifier) => internal_err!( From 759226be6c631134da3bd207400323b3f9f47321 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 28 Mar 2024 14:37:38 +0300 Subject: [PATCH 12/12] Update comment --- .../optimizer/src/common_subexpr_eliminate.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index b6c51b8f8a4c..e0638c2b6c01 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -850,10 +850,13 @@ fn replace_common_expr( } struct ProjectionAdder { + // Keeps track of cumulative usage of common expressions with its corresponding data type. + // accross plan where key is unsafe nodes that cumulative tracking is invalidated. insertion_point_map: HashMap>, depth: usize, + // Keeps track of cumulative usage of the common expressions with its corresponding data type. + // between safe nodes. complex_exprs: HashMap, - // should_update_parents: bool, } pub fn is_not_complex(op: &Operator) -> bool { matches!( @@ -967,21 +970,24 @@ impl TreeNodeRewriter for ProjectionAdder { self.depth += 1; match node { LogicalPlan::TableScan(_) => { + // Stop tracking cumulative usage at the source. + let complex_exprs = std::mem::take(&mut self.complex_exprs); self.insertion_point_map - .insert(self.depth - 1, self.complex_exprs.clone()); + .insert(self.depth - 1, complex_exprs); Ok(Transformed::no(node)) } LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) => { + // These are safe operators where, expression identity is preserved during operation. self.extend_with_exprs(&node); Ok(Transformed::no(node)) } LogicalPlan::Projection(_) => { - if self.complex_exprs.is_empty() { - self.extend_with_exprs(&node); - } else { - self.insertion_point_map - .insert(self.depth - 1, self.complex_exprs.clone()); - } + // Stop tracking cumulative usage at the projection since it may invalidate expression identity. + let complex_exprs = std::mem::take(&mut self.complex_exprs); + self.insertion_point_map + .insert(self.depth - 1, complex_exprs); + // Start tracking common expressions from now on including projection. + self.extend_with_exprs(&node); Ok(Transformed::no(node)) } _ => {