Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements #5754

Merged
merged 28 commits into from
Apr 4, 2023
Merged

Conversation

metesynnada
Copy link
Contributor

@metesynnada metesynnada commented Mar 27, 2023

Which issue does this PR close?

Closes #5715.

Rationale for this change

The current implementation of SymmetricHashJoin requires order information before the PipelineFixer. This limitation results in unnecessary sort and distribution enforcement requirements that impact the optimizer's performance. To overcome this issue, we have revamped SymmetricHashJoin to eliminate the need for order information before PipelineFixer.

What changes are included in this PR?

We have modified SymmetricHashJoin to function without requiring order information. The new implementation does not raise errors without order or filter information, though pruning is not supported without order information. Furthermore, we have set the required_input_ordering API to None, enabling the source to provide order information for piped executions. With this approach, we can genuinely remove the dependency on EnforceSorting and EnforceDistribution from PipelineFixer.

Are these changes tested?

Yes.

I encountered an erratic deadlock in the unbounded_file_with_symmetric_join test due to the current nature of the list_files_for_scan API in ListingTable. This API currently opens the file for metadata verification using the head() API in object_store, which can be problematic while handling FIFO files. However, the upcoming release of object_store will eliminate the need to open a file for metadata retrieval, resolving this issue. For now, I will temporarily bypass this test to focus on other aspects of the PR.

Are there any user-facing changes?

A new configuration option (allow_symmetric_joins_without_pruning) allows the user to constrain SymmetricHashJoins to run only on ordered inputs—no breaking changes in existing use cases.

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Mar 27, 2023
@metesynnada metesynnada changed the title Performance/remove enforcesorting Improving optimizer performance by eliminating unnecessary sort and distribution requirements Mar 27, 2023
@metesynnada metesynnada marked this pull request as draft March 29, 2023 06:55
@metesynnada metesynnada marked this pull request as ready for review April 2, 2023 19:35
@metesynnada
Copy link
Contributor Author

Hi @alamb, I think it is ready for review. I hope it will contribute to the query planning performance.

@alamb
Copy link
Contributor

alamb commented Apr 2, 2023

Thanks @metesynnada -- I will put this on my review list for tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @metesynnada -- I looked at this PR and the struture looks good to me.

I am not familiar with the pipeline fixer code so I did not review that portion. @mustafasrepo can you please review that code (if you have not already done so). If it is good with you I think we'll be good to merge this PR

I had some small comments but nothing that is required in my opinion

@@ -280,6 +280,10 @@ config_namespace! {
/// using the provided `target_partitions` level
pub repartition_joins: bool, default = true

/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
/// its inputs do not have any ordering or filtering
pub allow_symmetric_joins_without_pruning: bool, default = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how a symmetric hash join could generate correct results when the inputs don't have any ordering 🤔 Maybe we can add some additional comments about under what circumstances one would enable
/ disable this option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SHJ will always produce correct results, but it will use twice as much memory (assuming inputs are of the same size) for no gain except pipelining.

Some more explanation about this option: It is not always possible to detect 100% accurately whether pruning may occur or not -- the system may think pruning is not possible where it is actually possible. Therefore, one would enable this option if they have a-priori knowledge that data would indeed lend itself to pruning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you -- this explanation and the updated comments help to clarify

@@ -1293,9 +1293,6 @@ impl SessionState {
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// Enforce sort before PipelineFixer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb changed the title Improving optimizer performance by eliminating unnecessary sort and distribution requirements Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements Apr 3, 2023
@mingmwang
Copy link
Contributor

Can we have different physical optimizers list for the plans with/without unbounded sources?
And I think the bound/unbounded source should an attribute or method for Source Operators only.

@metesynnada
Copy link
Contributor Author

Can we have a different physical optimizers list for the plans with/without unbounded sources?

I think this would cause problems while we are using bounded and unbounded sources together in the same query.

And I think the bound/unbounded source should be an attribute or method for Source Operators only.

Assigning the responsibility to each ExecutionPlan to determine whether its input is unbounded or not, similar to order/distribution information, seems to be the optimal strategy for unifying unbounded and bounded execution. This approach maintains a separation of concerns and empowers us to make atomic decisions with our best effort.

Attempting to solve this problem globally may lead to inflexible design patterns and technical debt. Nonetheless, we currently have the capability to optimize and handle complex queries with a combination of both unbounded and bounded sources, which is a robust solution.

@alamb
Copy link
Contributor

alamb commented Apr 3, 2023

Can we have different physical optimizers list for the plans with/without unbounded sources?

This would make sense to me if there are different query plans / decisions that would be made in the two modes. It seems like the decision can be made locally at the moment by inspecting the plans / plan nodes, as suggested by @metesynnada

@mingmwang do you have some ideas about when a global mode would be more beneficial?

@ozankabak
Copy link
Contributor

ozankabak commented Apr 3, 2023

One can not achieve unified processing (where you can freely use streams and tables together) with the global mode route. With all the streaming improvements coming in, the fact that Datafusion can do this transparently with just standard SQL is an attractive differentiator and innovation IMO.

@mustafasrepo
Copy link
Contributor

@alamb I have reviewed this PR, in our internal repo. This PR is LGTM!.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @metesynnada and @ozankabak and @mustafasrepo -- I will plan to merge this PR tomorrow unless there are any other comments

@alamb alamb merged commit d6c2233 into apache:main Apr 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve optimizer performance by eliminating unnecessary sort and distribution requirements
5 participants