Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements #5754

Merged
merged 28 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b25bc46
Increase optimizer performance
metesynnada Mar 22, 2023
b5eb32b
Config added.
metesynnada Mar 22, 2023
575a6d7
Simplifications and comment improvements
ozankabak Mar 23, 2023
05f768c
More simplifications
ozankabak Mar 23, 2023
90d82df
Revamping tests for unbounded-unbounded cases.
metesynnada Mar 23, 2023
8886457
Review code
metesynnada Mar 24, 2023
36d450e
Move SHJ suitability from PipelineFixer to PipelineChecker, further S…
ozankabak Mar 25, 2023
5df5e05
Merge branch 'main' into performance/remove-enforcesorting
ozankabak Mar 27, 2023
119a870
Added logging on tests and ensure timeout
metesynnada Mar 28, 2023
a83a284
Robust fifo writing in case of slow executions
metesynnada Mar 29, 2023
8799000
Update fifo.rs
metesynnada Mar 29, 2023
02bd036
Update fifo.rs
metesynnada Mar 29, 2023
05497ef
Update fifo.rs
metesynnada Mar 29, 2023
311a891
Update fifo.rs
metesynnada Mar 29, 2023
b051efb
Get rid of locks
metesynnada Mar 29, 2023
c0ecbe4
Try exact one batch size
metesynnada Mar 30, 2023
e6ab621
Update fifo.rs
metesynnada Mar 30, 2023
7cd6b1e
Update fifo.rs
metesynnada Mar 30, 2023
9fa0c71
Update fifo.rs
metesynnada Mar 30, 2023
0a2d35f
Merge branch 'main' into performance/remove-enforcesorting
metesynnada Mar 30, 2023
c9bece5
Ignore FIFO test
metesynnada Mar 30, 2023
417e2f1
Merge branch 'main' into performance/remove-enforcesorting
metesynnada Apr 2, 2023
331851b
Update config.rs
metesynnada Apr 3, 2023
f962643
Config update
metesynnada Apr 3, 2023
2c15d4e
Update config.rs
metesynnada Apr 3, 2023
363960f
Update configs.md
metesynnada Apr 3, 2023
61c3434
Update config
metesynnada Apr 3, 2023
7d32c12
Update symmetric_hash_join.rs
metesynnada Apr 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ config_namespace! {
/// using the provided `target_partitions` level
pub repartition_joins: bool, default = true

/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
/// its inputs do not have any ordering or filtering If the flag is not enabled,
/// the SymmetricHashJoin operator will be unable to prune its internal buffers,
/// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
/// RightAnti, and RightSemi - being produced only at the end of the execution.
/// This is not typical in stream processing. Additionally, without proper design for
/// long runner execution, all types of joins may encounter out-of-memory errors.
pub allow_symmetric_joins_without_pruning: bool, default = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how a symmetric hash join could generate correct results when the inputs don't have any ordering 🤔 Maybe we can add some additional comments about under what circumstances one would enable
/ disable this option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SHJ will always produce correct results, but it will use twice as much memory (assuming inputs are of the same size) for no gain except pipelining.

Some more explanation about this option: It is not always possible to detect 100% accurately whether pruning may occur or not -- the system may think pruning is not possible where it is actually possible. Therefore, one would enable this option if they have a-priori knowledge that data would indeed lend itself to pruning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- this explanation and the updated comments help to clarify


/// When set to true, file groups will be repartitioned to achieve maximum parallelism.
/// Currently supported only for Parquet format in which case
/// multiple row groups from the same file may be read concurrently. If false then each
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,9 +1293,6 @@ impl SessionState {
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// Enforce sort before PipelineFixer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Arc::new(EnforceDistribution::new()),
Arc::new(EnforceSorting::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
Expand Down
105 changes: 49 additions & 56 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::intervals::{check_support, is_datatype_supported};
use std::sync::Arc;

/// The PipelineChecker rule rejects non-runnable query plans that use
Expand All @@ -42,10 +46,11 @@ impl PhysicalOptimizerRule for PipelineChecker {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let pipeline = PipelineStatePropagator::new(plan);
let state = pipeline.transform_up(&check_finiteness_requirements)?;
let state = pipeline
.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))?;
Ok(state.plan)
}

Expand Down Expand Up @@ -128,16 +133,39 @@ impl TreeNode for PipelineStatePropagator {
/// This function propagates finiteness information and rejects any plan with
/// pipeline-breaking operators acting on infinite inputs.
pub fn check_finiteness_requirements(
input: PipelineStatePropagator,
mut input: PipelineStatePropagator,
optimizer_options: &OptimizerOptions,
) -> Result<Transformed<PipelineStatePropagator>> {
let plan = input.plan;
let children = input.children_unbounded;
plan.unbounded_output(&children).map(|value| {
Transformed::Yes(PipelineStatePropagator {
plan,
unbounded: value,
children_unbounded: children,
if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
{
const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag";
return Err(DataFusionError::Plan(MSG.to_owned()));
}
}
input
.plan
.unbounded_output(&input.children_unbounded)
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
})
}

/// This function returns whether a given symmetric hash join is amenable to
/// data pruning. For this to be possible, it needs to have a filter where
/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
/// interval calculations.
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
check_support(filter.expression())
&& filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()))
})
}

Expand All @@ -154,27 +182,19 @@ mod sql_tests {
source_types: (SourceType::Unbounded, SourceType::Bounded),
expect_fail: false,
};

let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: true,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand All @@ -189,26 +209,17 @@ mod sql_tests {
expect_fail: true,
};
let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: false,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand All @@ -223,26 +234,17 @@ mod sql_tests {
expect_fail: false,
};
let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: false,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand All @@ -257,26 +259,17 @@ mod sql_tests {
expect_fail: true,
};
let test2 = BinaryTestCase {
source_types: (SourceType::Unbounded, SourceType::Unbounded),
expect_fail: true,
};
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Unbounded),
expect_fail: true,
};
let test4 = BinaryTestCase {
let test3 = BinaryTestCase {
source_types: (SourceType::Bounded, SourceType::Bounded),
expect_fail: false,
};
let case = QueryCase {
sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1"
.to_string(),
cases: vec![
Arc::new(test1),
Arc::new(test2),
Arc::new(test3),
Arc::new(test4),
],
cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)],
error_operator: "Join Error".to_string(),
};

Expand Down Expand Up @@ -321,7 +314,7 @@ mod sql_tests {
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Sort Error".to_string()
error_operator: "Window Error".to_string()
};

case.run().await?;
Expand All @@ -344,7 +337,7 @@ mod sql_tests {
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Sort Error".to_string()
error_operator: "Window Error".to_string()
};
case.run().await?;
Ok(())
Expand Down
Loading