Skip to content

Commit

Permalink
Update join_selection.rs (apache#13893)
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada authored Dec 24, 2024
1 parent 94f08ff commit 901a094
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl JoinSelection {
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
/// Checks statistics for join swap.
fn should_swap_join_order(
pub(crate) fn should_swap_join_order(
left: &dyn ExecutionPlan,
right: &dyn ExecutionPlan,
) -> Result<bool> {
Expand Down Expand Up @@ -108,7 +108,7 @@ fn supports_collect_by_thresholds(
}

/// Predicate that checks whether the given join type supports input swapping.
fn supports_swap(join_type: JoinType) -> bool {
pub(crate) fn supports_swap(join_type: JoinType) -> bool {
matches!(
join_type,
JoinType::Inner
Expand Down Expand Up @@ -176,7 +176,7 @@ fn swap_join_projection(
/// This function swaps the inputs of the given join operator.
/// This function is public so other downstream projects can use it
/// to construct `HashJoinExec` with right side as the build side.
pub fn swap_hash_join(
pub(crate) fn swap_hash_join(
hash_join: &HashJoinExec,
partition_mode: PartitionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -222,7 +222,7 @@ pub fn swap_hash_join(
}

/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required
fn swap_nl_join(join: &NestedLoopJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
pub(crate) fn swap_nl_join(join: &NestedLoopJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
let new_filter = swap_join_filter(join.filter());
let new_join_type = &swap_join_type(*join.join_type());

Expand Down Expand Up @@ -359,7 +359,7 @@ impl PhysicalOptimizerRule for JoinSelection {
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
/// When the `ignore_threshold` is false, this function will also check left
/// and right sizes in bytes or rows.
fn try_collect_left(
pub(crate) fn try_collect_left(
hash_join: &HashJoinExec,
ignore_threshold: bool,
threshold_byte_size: usize,
Expand Down Expand Up @@ -421,7 +421,14 @@ fn try_collect_left(
}
}

fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
///
/// Checks if the join order should be swapped based on the join type and input statistics.
/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
/// creates a standard partitioned hash join.
pub(crate) fn partitioned_hash_join(
hash_join: &HashJoinExec,
) -> Result<Arc<dyn ExecutionPlan>> {
let left = hash_join.left();
let right = hash_join.right();
if supports_swap(*hash_join.join_type()) && should_swap_join_order(&**left, &**right)?
Expand Down

0 comments on commit 901a094

Please sign in to comment.