-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements #5754
Improving optimizer performance by eliminating unnecessary sort and distribution passes, add more SymmetricHashJoin improvements #5754
Conversation
…HJ code simplifications
Hi @alamb, I think it is ready for review. I hope it will contribute to the query planning performance. |
Thanks @metesynnada -- I will put this on my review list for tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @metesynnada -- I looked at this PR and the struture looks good to me.
I am not familiar with the pipeline fixer code so I did not review that portion. @mustafasrepo can you please review that code (if you have not already done so). If it is good with you I think we'll be good to merge this PR
I had some small comments but nothing that is required in my opinion
@@ -280,6 +280,10 @@ config_namespace! { | |||
/// using the provided `target_partitions` level | |||
pub repartition_joins: bool, default = true | |||
|
|||
/// Should DataFusion allow symmetric hash joins for unbounded data sources even when | |||
/// its inputs do not have any ordering or filtering | |||
pub allow_symmetric_joins_without_pruning: bool, default = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand how a symmetric hash join could generate correct results when the inputs don't have any ordering 🤔 Maybe we can add some additional comments about under what circumstances one would enable
/ disable this option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SHJ will always produce correct results, but it will use twice as much memory (assuming inputs are of the same size) for no gain except pipelining.
Some more explanation about this option: It is not always possible to detect 100% accurately whether pruning may occur or not -- the system may think pruning is not possible where it is actually possible. Therefore, one would enable this option if they have a-priori knowledge that data would indeed lend itself to pruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you -- this explanation and the updated comments help to clarify
@@ -1293,9 +1293,6 @@ impl SessionState { | |||
// repartitioning and local sorting steps to meet distribution and ordering requirements. | |||
// Therefore, it should run before EnforceDistribution and EnforceSorting. | |||
Arc::new(JoinSelection::new()), | |||
// Enforce sort before PipelineFixer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Can we have different physical optimizers list for the plans with/without unbounded sources? |
I think this would cause problems while we are using bounded and unbounded sources together in the same query.
Assigning the responsibility to each ExecutionPlan to determine whether its input is unbounded or not, similar to order/distribution information, seems to be the optimal strategy for unifying unbounded and bounded execution. This approach maintains a separation of concerns and empowers us to make atomic decisions with our best effort. Attempting to solve this problem globally may lead to inflexible design patterns and technical debt. Nonetheless, we currently have the capability to optimize and handle complex queries with a combination of both unbounded and bounded sources, which is a robust solution. |
This would make sense to me if there are different query plans / decisions that would be made in the two modes. It seems like the decision can be made locally at the moment by inspecting the plans / plan nodes, as suggested by @metesynnada @mingmwang do you have some ideas about when a global mode would be more beneficial? |
One can not achieve unified processing (where you can freely use streams and tables together) with the global mode route. With all the streaming improvements coming in, the fact that Datafusion can do this transparently with just standard SQL is an attractive differentiator and innovation IMO. |
@alamb I have reviewed this PR, in our internal repo. This PR is LGTM!. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @metesynnada and @ozankabak and @mustafasrepo -- I will plan to merge this PR tomorrow unless there are any other comments
Which issue does this PR close?
Closes #5715.
Rationale for this change
The current implementation of SymmetricHashJoin requires order information before the PipelineFixer. This limitation results in unnecessary sort and distribution enforcement requirements that impact the optimizer's performance. To overcome this issue, we have revamped
SymmetricHashJoin
to eliminate the need for order information before PipelineFixer.What changes are included in this PR?
We have modified SymmetricHashJoin to function without requiring order information. The new implementation does not raise errors without order or filter information, though pruning is not supported without order information. Furthermore, we have set the required_input_ordering API to None, enabling the source to provide order information for piped executions. With this approach, we can genuinely remove the dependency on EnforceSorting and EnforceDistribution from PipelineFixer.
Are these changes tested?
Yes.
I encountered an erratic deadlock in the
unbounded_file_with_symmetric_join
test due to the current nature of thelist_files_for_scan
API inListingTable
. This API currently opens the file for metadata verification using thehead()
API inobject_store
, which can be problematic while handling FIFO files. However, the upcoming release ofobject_store
will eliminate the need to open a file for metadata retrieval, resolving this issue. For now, I will temporarily bypass this test to focus on other aspects of the PR.Are there any user-facing changes?
A new configuration option (
allow_symmetric_joins_without_pruning
) allows the user to constrainSymmetricHashJoin
s to run only on ordered inputs—no breaking changes in existing use cases.