Skip to content

Commit

Permalink
Improving optimizer performance by eliminating unnecessary sort and d…
Browse files Browse the repository at this point in the history
…istribution passes, add more SymmetricHashJoin improvements (apache#5754)

* Increase optimizer performance

* Config added.

* Simplifications and comment improvements

* More simplifications

* Revamping tests for unbounded-unbounded cases.

* Review code

* Move SHJ suitability from PipelineFixer to PipelineChecker, further SHJ code simplifications

* Added logging on tests and ensure timeout

* Robust fifo writing in case of slow executions

* Update fifo.rs

* Update fifo.rs

* Update fifo.rs

* Update fifo.rs

* Get rid of locks

* Try exact one batch size

* Update fifo.rs

* Update fifo.rs

* Update fifo.rs

* Ignore FIFO test

* Update config.rs

* Config update

* Update config.rs

* Update configs.md

* Update config

* Update symmetric_hash_join.rs

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
metesynnada and ozankabak authored Apr 4, 2023
1 parent 56a5adc commit d6c2233
Show file tree
Hide file tree
Showing 13 changed files with 916 additions and 719 deletions.
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ config_namespace! {
/// using the provided `target_partitions` level
pub repartition_joins: bool, default = true

/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
/// its inputs do not have any ordering or filtering If the flag is not enabled,
/// the SymmetricHashJoin operator will be unable to prune its internal buffers,
/// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
/// RightAnti, and RightSemi - being produced only at the end of the execution.
/// This is not typical in stream processing. Additionally, without proper design for
/// long runner execution, all types of joins may encounter out-of-memory errors.
pub allow_symmetric_joins_without_pruning: bool, default = true

/// When set to true, file groups will be repartitioned to achieve maximum parallelism.
/// Currently supported only for Parquet format in which case
/// multiple row groups from the same file may be read concurrently. If false then each
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,9 +1301,6 @@ impl SessionState {
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// Enforce sort before PipelineFixer
Arc::new(EnforceDistribution::new()),
Arc::new(EnforceSorting::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
Expand Down
105 changes: 49 additions & 56 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::intervals::{check_support, is_datatype_supported};
use std::sync::Arc;

/// The PipelineChecker rule rejects non-runnable query plans that use
Expand All @@ -42,10 +46,11 @@ impl PhysicalOptimizerRule for PipelineChecker {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let pipeline = PipelineStatePropagator::new(plan);
let state = pipeline.transform_up(&check_finiteness_requirements)?;
let state = pipeline
.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))?;
Ok(state.plan)
}

Expand Down Expand Up @@ -128,16 +133,39 @@ impl TreeNode for PipelineStatePropagator {
/// This function propagates finiteness information and rejects any plan with
/// pipeline-breaking operators acting on infinite inputs.
pub fn check_finiteness_requirements(
input: PipelineStatePropagator,
mut input: PipelineStatePropagator,
optimizer_options: &OptimizerOptions,
) -> Result<Transformed<PipelineStatePropagator>> {
let plan = input.plan;
let children = input.children_unbounded;
plan.unbounded_output(&children).map(|value| {
Transformed::Yes(PipelineStatePropagator {
plan,
unbounded: value,
children_unbounded: children,
if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
{
const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag";
return Err(DataFusionError::Plan(MSG.to_owned()));
}
}
input
.plan
.unbounded_output(&input.children_unbounded)
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
})
}

/// This function returns whether a given symmetric hash join is amenable to
/// data pruning. For this to be possible, it needs to have a filter where
/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
/// interval calculations.
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
check_support(filter.expression())
&& filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()))
})
}

Expand All @@ -154,27 +182,19 @@ mod sql_tests {
source_types: (SourceType::Unbounded, SourceType::Bounded),
expect_fail: false,
};

let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: true,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand All @@ -189,26 +209,17 @@ mod sql_tests {
expect_fail: true,
};
let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: false,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand All @@ -223,26 +234,17 @@ mod sql_tests {
expect_fail: false,
};
let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: false,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand All @@ -257,26 +259,17 @@ mod sql_tests {
expect_fail: true,
};
let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: true,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand Down Expand Up @@ -321,7 +314,7 @@ mod sql_tests {
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Sort Error".to_string()
error_operator: "Window Error".to_string()
};

case.run().await?;
Expand All @@ -344,7 +337,7 @@ mod sql_tests {
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Sort Error".to_string()
error_operator: "Window Error".to_string()
};
case.run().await?;
Ok(())
Expand Down
Loading

0 comments on commit d6c2233

Please sign in to comment.