From 7ce84822e35e205d5083dc534bbd97848f7b4ca4 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 24 Oct 2023 11:40:42 +0300 Subject: [PATCH 1/3] Move combine join to equivalences.rs file --- datafusion/physical-expr/src/equivalence.rs | 319 +++++++++++++++++- .../physical-plan/src/joins/cross_join.rs | 4 +- .../physical-plan/src/joins/hash_join.rs | 16 +- .../src/joins/nested_loop_join.rs | 10 +- .../src/joins/sort_merge_join.rs | 8 +- .../src/joins/symmetric_hash_join.rs | 5 +- datafusion/physical-plan/src/joins/utils.rs | 302 ----------------- 7 files changed, 342 insertions(+), 322 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 369c139aa30b..e9cd7ab68dcc 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -26,7 +26,8 @@ use arrow::datatypes::SchemaRef; use arrow_schema::Fields; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::Result; +use datafusion_common::{plan_err, DataFusionError, JoinSide, JoinType, Result}; +use datafusion_expr::UserDefinedLogicalNode; use itertools::izip; use std::collections::{HashMap, HashSet}; use std::hash::Hash; @@ -391,6 +392,10 @@ impl EquivalentClass { std::iter::once(&self.head).chain(self.others.iter()) } + pub fn into_iter(self) -> impl Iterator { + std::iter::once(self.head).chain(self.others.into_iter()) + } + pub fn len(&self) -> usize { self.others.len() + 1 } @@ -885,6 +890,240 @@ fn req_satisfied(given: LexOrderingRef, req: &[PhysicalSortRequirement]) -> bool true } +/// Combine equivalence properties of the given join inputs. +pub fn combine_join_equivalence_properties( + join_type: JoinType, + left_properties: EquivalenceProperties, + right_properties: EquivalenceProperties, + left_columns_len: usize, + on: &[(Column, Column)], + schema: SchemaRef, +) -> EquivalenceProperties { + let mut new_properties = EquivalenceProperties::new(schema); + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { + new_properties.extend(left_properties.classes().to_vec()); + let new_right_properties = right_properties + .classes() + .iter() + .map(|prop| { + let new_head = Column::new( + prop.head().name(), + left_columns_len + prop.head().index(), + ); + let new_others = prop + .others() + .iter() + .map(|col| { + Column::new(col.name(), left_columns_len + col.index()) + }) + .collect::>(); + EquivalentClass::new(new_head, new_others) + }) + .collect::>(); + + new_properties.extend(new_right_properties); + } + JoinType::LeftSemi | JoinType::LeftAnti => { + new_properties.extend(left_properties.classes().to_vec()) + } + JoinType::RightSemi | JoinType::RightAnti => { + new_properties.extend(right_properties.classes().to_vec()) + } + } + + if join_type == JoinType::Inner { + on.iter().for_each(|(column1, column2)| { + let new_column2 = + Column::new(column2.name(), left_columns_len + column2.index()); + new_properties.add_equal_conditions((column1, &new_column2)) + }) + } + new_properties +} + +/// Calculate equivalence properties for the given cross join operation. +pub fn cross_join_equivalence_properties( + left_properties: EquivalenceProperties, + right_properties: EquivalenceProperties, + left_columns_len: usize, + schema: SchemaRef, +) -> EquivalenceProperties { + let mut new_properties = EquivalenceProperties::new(schema); + new_properties.extend(left_properties.classes().to_vec()); + let new_right_properties = right_properties + .classes() + .iter() + .map(|prop| { + let new_head = + Column::new(prop.head().name(), left_columns_len + prop.head().index()); + let new_others = prop + .others() + .iter() + .map(|col| Column::new(col.name(), left_columns_len + col.index())) + .collect::>(); + EquivalentClass::new(new_head, new_others) + }) + .collect::>(); + new_properties.extend(new_right_properties); + new_properties +} + +/// Update right table ordering equivalences so that: +/// - They point to valid indices at the output of the join schema, and +/// - They are normalized with respect to equivalence columns. +/// +/// To do so, we increment column indices by the size of the left table when +/// join schema consists of a combination of left and right schema (Inner, +/// Left, Full, Right joins). Then, we normalize the sort expressions of +/// ordering equivalences one by one. We make sure that each expression in the +/// ordering equivalence is either: +/// - The head of the one of the equivalent classes, or +/// - Doesn't have an equivalent column. +/// +/// This way; once we normalize an expression according to equivalence properties, +/// it can thereafter safely be used for ordering equivalence normalization. +fn get_updated_right_ordering_equivalent_class( + join_type: &JoinType, + right_oeq_class: &OrderingEquivalentClass, + left_columns_len: usize, + join_eq_properties: &EquivalenceProperties, +) -> Result { + match join_type { + // In these modes, indices of the right schema should be offset by + // the left table size. + JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { + let right_oeq_class = right_oeq_class.add_offset(left_columns_len)?; + return Ok( + right_oeq_class.normalize_with_equivalence_properties(join_eq_properties) + ); + } + _ => {} + }; + Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)) +} + +/// Calculate ordering equivalence properties for the given join operation. +pub fn combine_join_ordering_equivalence_properties( + join_type: &JoinType, + left_oeq_properties: &OrderingEquivalenceProperties, + right_oeq_properties: &OrderingEquivalenceProperties, + schema: SchemaRef, + maintains_input_order: &[bool], + probe_side: Option, + join_eq_properties: EquivalenceProperties, +) -> Result { + let mut new_properties = OrderingEquivalenceProperties::new(schema); + let left_columns_len = left_oeq_properties.schema().fields().len(); + // All joins have 2 children + assert_eq!(maintains_input_order.len(), 2); + let left_maintains = maintains_input_order[0]; + let right_maintains = maintains_input_order[1]; + match (left_maintains, right_maintains) { + (true, true) => return plan_err!("Cannot maintain ordering of both sides"), + (true, false) => { + // In this special case, right side ordering can be prefixed with left side ordering. + if let ( + Some(JoinSide::Left), + JoinType::Inner, + Some(left_oeq_class), + Some(right_oeq_class), + ) = ( + probe_side, + join_type, + left_oeq_properties.oeq_class(), + right_oeq_properties.oeq_class(), + ) { + let updated_right_oeq = get_updated_right_ordering_equivalent_class( + join_type, + right_oeq_class, + left_columns_len, + &join_eq_properties, + )?; + + // Right side ordering equivalence properties should be prepended with + // those of the left side while constructing output ordering equivalence + // properties since stream side is the left side. + // + // If the right table ordering equivalences contain `b ASC`, and the output + // ordering of the left table is `a ASC`, then the ordering equivalence `b ASC` + // for the right table should be converted to `a ASC, b ASC` before it is added + // to the ordering equivalences of the join. + let mut orderings = vec![]; + for left_ordering in left_oeq_class.iter() { + for right_ordering in updated_right_oeq.iter() { + let mut ordering = left_ordering.to_vec(); + ordering.extend(right_ordering.to_vec()); + let ordering_normalized = + join_eq_properties.normalize_sort_exprs(&ordering); + orderings.push(ordering_normalized); + } + } + if !orderings.is_empty() { + let head = orderings.swap_remove(0); + let new_oeq_class = OrderingEquivalentClass::new(head, orderings); + new_properties.extend(Some(new_oeq_class)); + } + } else { + new_properties.extend(left_oeq_properties.oeq_class().cloned()); + } + } + (false, true) => { + let updated_right_oeq = right_oeq_properties + .oeq_class() + .map(|right_oeq_class| { + get_updated_right_ordering_equivalent_class( + join_type, + right_oeq_class, + left_columns_len, + &join_eq_properties, + ) + }) + .transpose()?; + // In this special case, left side ordering can be prefixed with right side ordering. + if let ( + Some(JoinSide::Right), + JoinType::Inner, + Some(left_oeq_class), + Some(right_oeg_class), + ) = ( + probe_side, + join_type, + left_oeq_properties.oeq_class(), + &updated_right_oeq, + ) { + // Left side ordering equivalence properties should be prepended with + // those of the right side while constructing output ordering equivalence + // properties since stream side is the right side. + // + // If the right table ordering equivalences contain `b ASC`, and the output + // ordering of the left table is `a ASC`, then the ordering equivalence `b ASC` + // for the right table should be converted to `a ASC, b ASC` before it is added + // to the ordering equivalences of the join. + let mut orderings = vec![]; + for right_ordering in right_oeg_class.iter() { + for left_ordering in left_oeq_class.iter() { + let mut ordering = right_ordering.to_vec(); + ordering.extend(left_ordering.to_vec()); + let ordering_normalized = + join_eq_properties.normalize_sort_exprs(&ordering); + orderings.push(ordering_normalized); + } + } + if !orderings.is_empty() { + let head = orderings.swap_remove(0); + let new_oeq_class = OrderingEquivalentClass::new(head, orderings); + new_properties.extend(Some(new_oeq_class)); + } + } else { + new_properties.extend(updated_right_oeq); + } + } + (false, false) => {} + } + Ok(new_properties) +} + /// This function searches for the slice `section` inside the slice `given`. /// It returns each range where `section` is compatible with the corresponding /// slice in `given`. @@ -1131,4 +1370,82 @@ mod tests { } Ok(()) } + + #[test] + fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> { + let join_type = JoinType::Inner; + + let options = SortOptions::default(); + let right_oeq_class = OrderingEquivalentClass::new( + vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("x", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 1)), + options, + }, + ], + vec![vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 2)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("w", 3)), + options, + }, + ]], + ); + + let left_columns_len = 4; + + let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"] + .into_iter() + .map(|name| Field::new(name, DataType::Int32, true)) + .collect(); + + let mut join_eq_properties = + EquivalenceProperties::new(Arc::new(Schema::new(fields))); + join_eq_properties + .add_equal_conditions((&Column::new("a", 0), &Column::new("x", 4))); + join_eq_properties + .add_equal_conditions((&Column::new("d", 3), &Column::new("w", 7))); + + let result = get_updated_right_ordering_equivalent_class( + &join_type, + &right_oeq_class, + left_columns_len, + &join_eq_properties, + )?; + + let expected = OrderingEquivalentClass::new( + vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 5)), + options, + }, + ], + vec![vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("z", 6)), + options, + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options, + }, + ]], + ); + + assert_eq!(result.head(), expected.head()); + assert_eq!(result.others(), expected.others()); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 1ffd9ad1c18a..c9c92e8c11ae 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -21,8 +21,7 @@ use std::{any::Any, sync::Arc, task::Poll}; use super::utils::{ - adjust_right_output_partitioning, cross_join_equivalence_properties, - BuildProbeJoinMetrics, OnceAsync, OnceFut, + adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceAsync, OnceFut, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::DisplayAs; @@ -41,6 +40,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use async_trait::async_trait; +use datafusion_physical_expr::equivalence::cross_join_equivalence_properties; use futures::{ready, StreamExt}; use futures::{Stream, TryStreamExt}; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 2ffa1f61a23b..36076a69ea60 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -26,8 +26,8 @@ use std::{any::Any, usize, vec}; use crate::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, - calculate_join_output_ordering, combine_join_ordering_equivalence_properties, - get_final_indices_from_bit_map, need_produce_result_in_final, + calculate_join_output_ordering, get_final_indices_from_bit_map, + need_produce_result_in_final, }; use crate::DisplayAs; use crate::{ @@ -39,9 +39,8 @@ use crate::{ joins::hash_join_utils::{JoinHashMap, JoinHashMapType}, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, - combine_join_equivalence_properties, estimate_join_statistics, - partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, - JoinFilter, JoinOn, + estimate_join_statistics, partitioned_join_output_partitioning, + BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinOn, }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, @@ -72,6 +71,9 @@ use datafusion_physical_expr::OrderingEquivalenceProperties; use ahash::RandomState; use arrow::compute::kernels::cmp::{eq, not_distinct}; +use datafusion_physical_expr::equivalence::{ + combine_join_equivalence_properties, combine_join_ordering_equivalence_properties, +}; use futures::{ready, Stream, StreamExt, TryStreamExt}; type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation); @@ -380,8 +382,8 @@ impl ExecutionPlan for HashJoinExec { fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { combine_join_ordering_equivalence_properties( &self.join_type, - &self.left, - &self.right, + &self.left.ordering_equivalence_properties(), + &self.right.ordering_equivalence_properties(), self.schema(), &self.maintains_input_order(), Some(Self::probe_side()), diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 25cb374e941f..a113066e39d1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -27,11 +27,10 @@ use std::task::Poll; use crate::coalesce_batches::concat_batches; use crate::joins::utils::{ append_right_indices, apply_join_filter_to_indices, build_batch_from_indices, - build_join_schema, check_join_is_valid, combine_join_equivalence_properties, - estimate_join_statistics, get_anti_indices, get_anti_u64_indices, - get_final_indices_from_bit_map, get_semi_indices, get_semi_u64_indices, - partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, - OnceAsync, OnceFut, + build_join_schema, check_join_is_valid, estimate_join_statistics, get_anti_indices, + get_anti_u64_indices, get_final_indices_from_bit_map, get_semi_indices, + get_semi_u64_indices, partitioned_join_output_partitioning, BuildProbeJoinMetrics, + ColumnIndex, JoinFilter, OnceAsync, OnceFut, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ @@ -51,6 +50,7 @@ use datafusion_execution::TaskContext; use datafusion_expr::JoinType; use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr}; +use datafusion_physical_expr::equivalence::combine_join_equivalence_properties; use futures::{ready, Stream, StreamExt, TryStreamExt}; /// Data of the inner table side diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 98fe751b22ef..1f32401d9044 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -33,7 +33,6 @@ use std::task::{Context, Poll}; use crate::expressions::{Column, PhysicalSortExpr}; use crate::joins::utils::{ build_join_schema, calculate_join_output_ordering, check_join_is_valid, - combine_join_equivalence_properties, combine_join_ordering_equivalence_properties, estimate_join_statistics, partitioned_join_output_partitioning, JoinOn, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; @@ -55,6 +54,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement}; +use datafusion_physical_expr::equivalence::{ + combine_join_equivalence_properties, combine_join_ordering_equivalence_properties, +}; use futures::{Stream, StreamExt}; /// join execution plan executes partitions in parallel and combines them into a set of @@ -297,8 +299,8 @@ impl ExecutionPlan for SortMergeJoinExec { fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { combine_join_ordering_equivalence_properties( &self.join_type, - &self.left, - &self.right, + &self.left.ordering_equivalence_properties(), + &self.right.ordering_equivalence_properties(), self.schema(), &self.maintains_input_order(), Some(Self::probe_side(&self.join_type)), diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 3450331133bd..00d43aead434 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -42,8 +42,8 @@ use crate::joins::hash_join_utils::{ }; use crate::joins::utils::{ build_batch_from_indices, build_join_schema, check_join_is_valid, - combine_join_equivalence_properties, partitioned_join_output_partitioning, - prepare_sorted_exprs, ColumnIndex, JoinFilter, JoinOn, + partitioned_join_output_partitioning, prepare_sorted_exprs, ColumnIndex, JoinFilter, + JoinOn, }; use crate::{ expressions::{Column, PhysicalSortExpr}, @@ -66,6 +66,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::intervals::ExprIntervalGraph; use ahash::RandomState; +use datafusion_physical_expr::equivalence::combine_join_equivalence_properties; use futures::stream::{select, BoxStream}; use futures::{Stream, StreamExt}; use hashbrown::HashSet; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index afde986c0bb6..4d5e87956211 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -231,230 +231,6 @@ pub fn calculate_join_output_ordering( Ok((!output_ordering.is_empty()).then_some(output_ordering)) } -/// Combine equivalence properties of the given join inputs. -pub fn combine_join_equivalence_properties( - join_type: JoinType, - left_properties: EquivalenceProperties, - right_properties: EquivalenceProperties, - left_columns_len: usize, - on: &[(Column, Column)], - schema: SchemaRef, -) -> EquivalenceProperties { - let mut new_properties = EquivalenceProperties::new(schema); - match join_type { - JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { - new_properties.extend(left_properties.classes().to_vec()); - let new_right_properties = right_properties - .classes() - .iter() - .map(|prop| { - let new_head = Column::new( - prop.head().name(), - left_columns_len + prop.head().index(), - ); - let new_others = prop - .others() - .iter() - .map(|col| { - Column::new(col.name(), left_columns_len + col.index()) - }) - .collect::>(); - EquivalentClass::new(new_head, new_others) - }) - .collect::>(); - - new_properties.extend(new_right_properties); - } - JoinType::LeftSemi | JoinType::LeftAnti => { - new_properties.extend(left_properties.classes().to_vec()) - } - JoinType::RightSemi | JoinType::RightAnti => { - new_properties.extend(right_properties.classes().to_vec()) - } - } - - if join_type == JoinType::Inner { - on.iter().for_each(|(column1, column2)| { - let new_column2 = - Column::new(column2.name(), left_columns_len + column2.index()); - new_properties.add_equal_conditions((column1, &new_column2)) - }) - } - new_properties -} - -/// Calculate equivalence properties for the given cross join operation. -pub fn cross_join_equivalence_properties( - left_properties: EquivalenceProperties, - right_properties: EquivalenceProperties, - left_columns_len: usize, - schema: SchemaRef, -) -> EquivalenceProperties { - let mut new_properties = EquivalenceProperties::new(schema); - new_properties.extend(left_properties.classes().to_vec()); - let new_right_properties = right_properties - .classes() - .iter() - .map(|prop| { - let new_head = - Column::new(prop.head().name(), left_columns_len + prop.head().index()); - let new_others = prop - .others() - .iter() - .map(|col| Column::new(col.name(), left_columns_len + col.index())) - .collect::>(); - EquivalentClass::new(new_head, new_others) - }) - .collect::>(); - new_properties.extend(new_right_properties); - new_properties -} - -/// Update right table ordering equivalences so that: -/// - They point to valid indices at the output of the join schema, and -/// - They are normalized with respect to equivalence columns. -/// -/// To do so, we increment column indices by the size of the left table when -/// join schema consists of a combination of left and right schema (Inner, -/// Left, Full, Right joins). Then, we normalize the sort expressions of -/// ordering equivalences one by one. We make sure that each expression in the -/// ordering equivalence is either: -/// - The head of the one of the equivalent classes, or -/// - Doesn't have an equivalent column. -/// -/// This way; once we normalize an expression according to equivalence properties, -/// it can thereafter safely be used for ordering equivalence normalization. -fn get_updated_right_ordering_equivalent_class( - join_type: &JoinType, - right_oeq_class: &OrderingEquivalentClass, - left_columns_len: usize, - join_eq_properties: &EquivalenceProperties, -) -> Result { - match join_type { - // In these modes, indices of the right schema should be offset by - // the left table size. - JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { - let right_oeq_class = right_oeq_class.add_offset(left_columns_len)?; - return Ok( - right_oeq_class.normalize_with_equivalence_properties(join_eq_properties) - ); - } - _ => {} - }; - Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)) -} - -/// Calculate ordering equivalence properties for the given join operation. -pub fn combine_join_ordering_equivalence_properties( - join_type: &JoinType, - left: &Arc, - right: &Arc, - schema: SchemaRef, - maintains_input_order: &[bool], - probe_side: Option, - join_eq_properties: EquivalenceProperties, -) -> Result { - let mut new_properties = OrderingEquivalenceProperties::new(schema); - let left_columns_len = left.schema().fields.len(); - let left_oeq_properties = left.ordering_equivalence_properties(); - let right_oeq_properties = right.ordering_equivalence_properties(); - // All joins have 2 children - assert_eq!(maintains_input_order.len(), 2); - let left_maintains = maintains_input_order[0]; - let right_maintains = maintains_input_order[1]; - match (left_maintains, right_maintains) { - (true, true) => return plan_err!("Cannot maintain ordering of both sides"), - (true, false) => { - new_properties.extend(left_oeq_properties.oeq_class().cloned()); - // In this special case, right side ordering can be prefixed with left side ordering. - if let ( - Some(JoinSide::Left), - // right side have an ordering - Some(_), - JoinType::Inner, - Some(oeq_class), - ) = ( - probe_side, - right.output_ordering(), - join_type, - right_oeq_properties.oeq_class(), - ) { - let left_output_ordering = left.output_ordering().unwrap_or(&[]); - - let updated_right_oeq = get_updated_right_ordering_equivalent_class( - join_type, - oeq_class, - left_columns_len, - &join_eq_properties, - )?; - - // Right side ordering equivalence properties should be prepended with - // those of the left side while constructing output ordering equivalence - // properties since stream side is the left side. - // - // If the right table ordering equivalences contain `b ASC`, and the output - // ordering of the left table is `a ASC`, then the ordering equivalence `b ASC` - // for the right table should be converted to `a ASC, b ASC` before it is added - // to the ordering equivalences of the join. - let updated_right_oeq_class = updated_right_oeq - .prefix_ordering_equivalent_class_with_existing_ordering( - left_output_ordering, - &join_eq_properties, - ); - new_properties.extend(Some(updated_right_oeq_class)); - } - } - (false, true) => { - let updated_right_oeq = right_oeq_properties - .oeq_class() - .map(|right_oeq_class| { - get_updated_right_ordering_equivalent_class( - join_type, - right_oeq_class, - left_columns_len, - &join_eq_properties, - ) - }) - .transpose()?; - new_properties.extend(updated_right_oeq); - // In this special case, left side ordering can be prefixed with right side ordering. - if let ( - Some(JoinSide::Right), - // left side have an ordering - Some(_), - JoinType::Inner, - Some(left_oeq_class), - ) = ( - probe_side, - left.output_ordering(), - join_type, - left_oeq_properties.oeq_class(), - ) { - let right_output_ordering = right.output_ordering().unwrap_or(&[]); - let right_output_ordering = - add_offset_to_lex_ordering(right_output_ordering, left_columns_len)?; - - // Left side ordering equivalence properties should be prepended with - // those of the right side while constructing output ordering equivalence - // properties since stream side is the right side. - // - // If the right table ordering equivalences contain `b ASC`, and the output - // ordering of the left table is `a ASC`, then the ordering equivalence `b ASC` - // for the right table should be converted to `a ASC, b ASC` before it is added - // to the ordering equivalences of the join. - let updated_left_oeq_class = left_oeq_class - .prefix_ordering_equivalent_class_with_existing_ordering( - &right_output_ordering, - &join_eq_properties, - ); - new_properties.extend(Some(updated_left_oeq_class)); - } - } - (false, false) => {} - } - Ok(new_properties) -} - /// Information about the index and placement (left or right) of the columns #[derive(Debug, Clone)] pub struct ColumnIndex { @@ -1930,84 +1706,6 @@ mod tests { Ok(()) } - #[test] - fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> { - let join_type = JoinType::Inner; - - let options = SortOptions::default(); - let right_oeq_class = OrderingEquivalentClass::new( - vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("x", 0)), - options, - }, - PhysicalSortExpr { - expr: Arc::new(Column::new("y", 1)), - options, - }, - ], - vec![vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("z", 2)), - options, - }, - PhysicalSortExpr { - expr: Arc::new(Column::new("w", 3)), - options, - }, - ]], - ); - - let left_columns_len = 4; - - let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"] - .into_iter() - .map(|name| Field::new(name, DataType::Int32, true)) - .collect(); - - let mut join_eq_properties = - EquivalenceProperties::new(Arc::new(Schema::new(fields))); - join_eq_properties - .add_equal_conditions((&Column::new("a", 0), &Column::new("x", 4))); - join_eq_properties - .add_equal_conditions((&Column::new("d", 3), &Column::new("w", 7))); - - let result = get_updated_right_ordering_equivalent_class( - &join_type, - &right_oeq_class, - left_columns_len, - &join_eq_properties, - )?; - - let expected = OrderingEquivalentClass::new( - vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("a", 0)), - options, - }, - PhysicalSortExpr { - expr: Arc::new(Column::new("y", 5)), - options, - }, - ], - vec![vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("z", 6)), - options, - }, - PhysicalSortExpr { - expr: Arc::new(Column::new("d", 3)), - options, - }, - ]], - ); - - assert_eq!(result.head(), expected.head()); - assert_eq!(result.others(), expected.others()); - - Ok(()) - } - #[test] fn test_calculate_join_output_ordering() -> Result<()> { let options = SortOptions::default(); From 635fb53d124680b224b24f24c60dfe7b455f8541 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 24 Oct 2023 11:45:00 +0300 Subject: [PATCH 2/3] Resolve linter errors --- datafusion/physical-expr/src/equivalence.rs | 5 ----- datafusion/physical-plan/src/joins/utils.rs | 8 ++------ 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index e9cd7ab68dcc..39cfd16cc109 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -27,7 +27,6 @@ use arrow_schema::Fields; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{plan_err, DataFusionError, JoinSide, JoinType, Result}; -use datafusion_expr::UserDefinedLogicalNode; use itertools::izip; use std::collections::{HashMap, HashSet}; use std::hash::Hash; @@ -392,10 +391,6 @@ impl EquivalentClass { std::iter::once(&self.head).chain(self.others.iter()) } - pub fn into_iter(self) -> impl Iterator { - std::iter::once(self.head).chain(self.others.into_iter()) - } - pub fn len(&self) -> usize { self.others.len() + 1 } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 4d5e87956211..eafe26d92658 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -25,10 +25,7 @@ use std::usize; use crate::joins::hash_join_utils::{build_filter_input_order, SortedFilterExpr}; use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; -use crate::{ - ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning, SchemaRef, - Statistics, -}; +use crate::{ColumnStatistics, ExecutionPlan, Partitioning, Statistics}; use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, @@ -48,8 +45,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound}; use datafusion_physical_expr::utils::merge_vectors; use datafusion_physical_expr::{ - add_offset_to_lex_ordering, EquivalentClass, LexOrdering, LexOrderingRef, - OrderingEquivalenceProperties, OrderingEquivalentClass, PhysicalExpr, + add_offset_to_lex_ordering, LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr, }; From 7c3502961d68479d533ec9db2f0f3f9b2b24178c Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 24 Oct 2023 13:06:13 +0300 Subject: [PATCH 3/3] Simplifications, do not return Error in add_offset --- .../src/physical_optimizer/sort_pushdown.rs | 2 +- datafusion/physical-expr/src/equivalence.rs | 58 ++++++++++--------- datafusion/physical-expr/src/lib.rs | 8 +-- .../physical-plan/src/joins/hash_join.rs | 3 +- .../src/joins/sort_merge_join.rs | 3 +- datafusion/physical-plan/src/joins/utils.rs | 30 ++++------ 6 files changed, 47 insertions(+), 57 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 9b81ad3efb50..04782ee6951f 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -347,7 +347,7 @@ fn try_pushdown_requirements_to_join( smj.left().schema().fields.len(), &smj.maintains_input_order(), Some(SortMergeJoinExec::probe_side(&smj.join_type())), - )?; + ); Ok(ordering_satisfy_requirement( new_output_ordering.as_deref(), parent_required, diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 39cfd16cc109..4fce6854138d 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef; use arrow_schema::Fields; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{plan_err, DataFusionError, JoinSide, JoinType, Result}; +use datafusion_common::{JoinSide, JoinType}; use itertools::izip; use std::collections::{HashMap, HashSet}; use std::hash::Hash; @@ -463,14 +463,14 @@ impl OrderingEquivalentClass { } /// Adds `offset` value to the index of each expression inside `self.head` and `self.others`. - pub fn add_offset(&self, offset: usize) -> Result { - let head = add_offset_to_lex_ordering(self.head(), offset)?; + pub fn add_offset(&self, offset: usize) -> OrderingEquivalentClass { + let head = add_offset_to_lex_ordering(self.head(), offset); let others = self .others() .iter() .map(|ordering| add_offset_to_lex_ordering(ordering, offset)) - .collect::>>()?; - Ok(OrderingEquivalentClass::new(head, others)) + .collect::>(); + OrderingEquivalentClass::new(head, others) } /// This function normalizes `OrderingEquivalenceProperties` according to `eq_properties`. @@ -983,19 +983,18 @@ fn get_updated_right_ordering_equivalent_class( right_oeq_class: &OrderingEquivalentClass, left_columns_len: usize, join_eq_properties: &EquivalenceProperties, -) -> Result { +) -> OrderingEquivalentClass { match join_type { // In these modes, indices of the right schema should be offset by // the left table size. JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { - let right_oeq_class = right_oeq_class.add_offset(left_columns_len)?; - return Ok( - right_oeq_class.normalize_with_equivalence_properties(join_eq_properties) - ); + let right_oeq_class = right_oeq_class.add_offset(left_columns_len); + return right_oeq_class + .normalize_with_equivalence_properties(join_eq_properties); } _ => {} }; - Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)) + right_oeq_class.normalize_with_equivalence_properties(join_eq_properties) } /// Calculate ordering equivalence properties for the given join operation. @@ -1007,7 +1006,7 @@ pub fn combine_join_ordering_equivalence_properties( maintains_input_order: &[bool], probe_side: Option, join_eq_properties: EquivalenceProperties, -) -> Result { +) -> OrderingEquivalenceProperties { let mut new_properties = OrderingEquivalenceProperties::new(schema); let left_columns_len = left_oeq_properties.schema().fields().len(); // All joins have 2 children @@ -1015,7 +1014,9 @@ pub fn combine_join_ordering_equivalence_properties( let left_maintains = maintains_input_order[0]; let right_maintains = maintains_input_order[1]; match (left_maintains, right_maintains) { - (true, true) => return plan_err!("Cannot maintain ordering of both sides"), + (true, true) => { + unreachable!("Cannot maintain ordering of both sides"); + } (true, false) => { // In this special case, right side ordering can be prefixed with left side ordering. if let ( @@ -1034,7 +1035,7 @@ pub fn combine_join_ordering_equivalence_properties( right_oeq_class, left_columns_len, &join_eq_properties, - )?; + ); // Right side ordering equivalence properties should be prepended with // those of the left side while constructing output ordering equivalence @@ -1064,17 +1065,15 @@ pub fn combine_join_ordering_equivalence_properties( } } (false, true) => { - let updated_right_oeq = right_oeq_properties - .oeq_class() - .map(|right_oeq_class| { + let updated_right_oeq = + right_oeq_properties.oeq_class().map(|right_oeq_class| { get_updated_right_ordering_equivalent_class( join_type, right_oeq_class, left_columns_len, &join_eq_properties, ) - }) - .transpose()?; + }); // In this special case, left side ordering can be prefixed with right side ordering. if let ( Some(JoinSide::Right), @@ -1116,7 +1115,7 @@ pub fn combine_join_ordering_equivalence_properties( } (false, false) => {} } - Ok(new_properties) + new_properties } /// This function searches for the slice `section` inside the slice `given`. @@ -1169,10 +1168,10 @@ fn prune_sort_reqs_with_constants( /// Adds the `offset` value to `Column` indices inside `expr`. This function is /// generally used during the update of the right table schema in join operations. -pub(crate) fn add_offset_to_expr( +pub fn add_offset_to_expr( expr: Arc, offset: usize, -) -> Result> { +) -> Arc { expr.transform_down(&|e| match e.as_any().downcast_ref::() { Some(col) => Ok(Transformed::Yes(Arc::new(Column::new( col.name(), @@ -1180,17 +1179,20 @@ pub(crate) fn add_offset_to_expr( )))), None => Ok(Transformed::No(e)), }) + .unwrap() + // Note that we can safely unwrap here since our transform always returns + // an `Ok` value. } /// Adds the `offset` value to `Column` indices inside `sort_expr.expr`. pub(crate) fn add_offset_to_sort_expr( sort_expr: &PhysicalSortExpr, offset: usize, -) -> Result { - Ok(PhysicalSortExpr { - expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?, +) -> PhysicalSortExpr { + PhysicalSortExpr { + expr: add_offset_to_expr(sort_expr.expr.clone(), offset), options: sort_expr.options, - }) + } } /// Adds the `offset` value to `Column` indices for each `sort_expr.expr` @@ -1198,7 +1200,7 @@ pub(crate) fn add_offset_to_sort_expr( pub fn add_offset_to_lex_ordering( sort_exprs: LexOrderingRef, offset: usize, -) -> Result { +) -> LexOrdering { sort_exprs .iter() .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset)) @@ -1413,7 +1415,7 @@ mod tests { &right_oeq_class, left_columns_len, &join_eq_properties, - )?; + ); let expected = OrderingEquivalentClass::new( vec![ diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e670380e59d2..977542bd8e66 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -54,10 +54,10 @@ pub use aggregate::groups_accumulator::{ pub use aggregate::AggregateExpr; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; pub use equivalence::{ - add_offset_to_lex_ordering, ordering_equivalence_properties_helper, - project_equivalence_properties, project_ordering_equivalence_properties, - EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties, - OrderingEquivalentClass, + add_offset_to_expr, add_offset_to_lex_ordering, + ordering_equivalence_properties_helper, project_equivalence_properties, + project_ordering_equivalence_properties, EquivalenceProperties, EquivalentClass, + OrderingEquivalenceProperties, OrderingEquivalentClass, }; pub use partitioning::{Distribution, Partitioning}; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 36076a69ea60..9aa776fe054c 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -148,7 +148,7 @@ impl HashJoinExec { left_schema.fields.len(), &Self::maintains_input_order(*join_type), Some(Self::probe_side()), - )?; + ); Ok(HashJoinExec { left, @@ -389,7 +389,6 @@ impl ExecutionPlan for HashJoinExec { Some(Self::probe_side()), self.equivalence_properties(), ) - .unwrap() } fn children(&self) -> Vec> { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 1f32401d9044..759149a64d9f 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -142,7 +142,7 @@ impl SortMergeJoinExec { left_schema.fields.len(), &Self::maintains_input_order(join_type), Some(Self::probe_side(&join_type)), - )?; + ); let schema = Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); @@ -306,7 +306,6 @@ impl ExecutionPlan for SortMergeJoinExec { Some(Self::probe_side(&self.join_type)), self.equivalence_properties(), ) - .unwrap() } fn children(&self) -> Vec> { diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index eafe26d92658..cf150ddf575f 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -36,17 +36,16 @@ use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - exec_err, plan_datafusion_err, plan_err, DataFusionError, JoinSide, JoinType, Result, + plan_datafusion_err, plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult, }; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound}; use datafusion_physical_expr::utils::merge_vectors; use datafusion_physical_expr::{ - add_offset_to_lex_ordering, LexOrdering, LexOrderingRef, PhysicalExpr, - PhysicalSortExpr, + add_offset_to_expr, add_offset_to_lex_ordering, LexOrdering, LexOrderingRef, + PhysicalExpr, PhysicalSortExpr, }; use futures::future::{BoxFuture, Shared}; @@ -133,16 +132,7 @@ pub fn adjust_right_output_partitioning( Partitioning::Hash(exprs, size) => { let new_exprs = exprs .into_iter() - .map(|expr| { - expr.transform_down(&|e| match e.as_any().downcast_ref::() { - Some(col) => Ok(Transformed::Yes(Arc::new(Column::new( - col.name(), - left_columns_len + col.index(), - )))), - None => Ok(Transformed::No(e)), - }) - .unwrap() - }) + .map(|expr| add_offset_to_expr(expr, left_columns_len)) .collect::>(); Partitioning::Hash(new_exprs, size) } @@ -178,7 +168,7 @@ pub fn calculate_join_output_ordering( left_columns_len: usize, maintains_input_order: &[bool], probe_side: Option, -) -> Result> { +) -> Option { // All joins have 2 children: assert_eq!(maintains_input_order.len(), 2); let left_maintains = maintains_input_order[0]; @@ -187,13 +177,13 @@ pub fn calculate_join_output_ordering( // In the case below, right ordering should be offseted with the left // side length, since we append the right table to the left table. JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - add_offset_to_lex_ordering(right_ordering, left_columns_len)? + add_offset_to_lex_ordering(right_ordering, left_columns_len) } _ => right_ordering.to_vec(), }; let output_ordering = match (left_maintains, right_maintains) { (true, true) => { - return exec_err!("Cannot maintain ordering of both sides"); + unreachable!("Cannot maintain ordering of both sides"); } (true, false) => { // Special case, we can prefix ordering of right side with the ordering of left side. @@ -222,9 +212,9 @@ pub fn calculate_join_output_ordering( } } // Doesn't maintain ordering, output ordering is None. - (false, false) => return Ok(None), + (false, false) => return None, }; - Ok((!output_ordering.is_empty()).then_some(output_ordering)) + (!output_ordering.is_empty()).then_some(output_ordering) } /// Information about the index and placement (left or right) of the columns @@ -1794,7 +1784,7 @@ mod tests { left_columns_len, maintains_input_order, probe_side - )?, + ), expected[i] ); }