-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support nested loop join with the initial version #4562
Conversation
} | ||
|
||
#[cfg(test)] | ||
mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are som duplicated test code in the hashjoin
and cross join
.
I will refactor and clean up them in the followup pr
731229e
to
18c6152
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great👍👍
I prepare to review this PR carefully in a few days.
I will review this PR tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick scan through this code -- looks quite cool @liukun4515
Given @mingmwang and @jackwener plan to review I won't put this one on my deep review queue, unless there is something specific you would like me to look at
cc @Dandandan as I think he enjoys Join implementations
I wonder if it makes sense to also make use of |
I would be in favor of this proposal (as a follow on PR) if possible. Joins are already (necessarily) complicated, so the more we can do to keep their complexity in check the better |
Yes, it make sense. I think CrossJoinExec should be a form of |
datafusion/core/tests/sql/joins.rs
Outdated
async fn error_cross_join() -> Result<()> { | ||
let test_repartition_joins = vec![true, false]; | ||
for repartition_joins in test_repartition_joins { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can rename this UT's name to something else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to left_join_with_nonequal_condition
if session_state.config.target_partitions() > 1 | ||
if join_on.is_empty() { | ||
// there is no equal join condition, use the nested loop join | ||
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for NLJ, there is no partitioned version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A partitioned version is actually similar to CrossJoin/CartesianProduct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the left out
join type, the distribution of the join is
vec![
Distribution::UnspecifiedDistribution,
Distribution::SinglePartition,
]
can we partitioned the left side before NLJ exec to accelerate the execution?
fn distribution_from_join_type(join_type: &JoinType) -> Vec<Distribution> { | ||
match join_type { | ||
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { | ||
// need the left data, and the right should be one partition | ||
vec![ | ||
Distribution::UnspecifiedDistribution, | ||
Distribution::SinglePartition, | ||
] | ||
} | ||
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { | ||
// need the right data, and the left should be one partition | ||
vec![ | ||
Distribution::SinglePartition, | ||
Distribution::UnspecifiedDistribution, | ||
] | ||
} | ||
JoinType::Full => { | ||
// need the left and right data, and the left and right should be one partition | ||
vec![Distribution::SinglePartition, Distribution::SinglePartition] | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The required distribution logic for the nested loop join is not consistent with the other physical join types(HashJoinExec::CollectLeft, CrossJoinExec), those existing joins do not consider the join types. For CrossJoinExec and HashJoinExec::CollectLeft, they always collect left. And there is a JoinSelection rule to adjust/reorder the joins sides to make sure the left side is the smaller side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially for FullOut join, if we enforce the both input sides coalesced to the single partition, we might encounter performance issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current execution model, we need to decide the distribution by the join type
pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool { | ||
matches!( | ||
join_type, | ||
JoinType::Left | JoinType::LeftAnti | JoinType::LeftSemi | JoinType::Full | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to add more comments to those methods. Originally they are only used by HashJoin and they are private methods. Now they are used both by HashJoin and NestedLoopJoin.
pub(crate) fn get_final_indices( | ||
left_bit_map: &BooleanBufferBuilder, | ||
join_type: JoinType, | ||
) -> (UInt64Array, UInt32Array) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Please add more comments to explain more clearly what's the expected input and what are the outputs.
/// right side | ||
pub(crate) right: Arc<dyn ExecutionPlan>, | ||
/// Filters which are applied while finding matching rows | ||
pub(crate) filter: Option<JoinFilter>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused about this.
In my humble opinion, look like that JoinFilter
in HashJoin
isn't condition
but predicate
which is used for filtering result of hashjoin.
But JoinFilter
here is as condition
.
I don't know if my opinion is wrong.
If it's right, I recommend to name condition
instead of filter
, and add comment for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the naming of JoinFilter
is consistent between HashJoinExec
and NestedLoopJoinExec
. And in the logical Join
plan, it is also called filter
.
// right side | ||
let right_side = if left_is_single_partition { | ||
self.right.execute(partition, context)? | ||
} else { | ||
// the distribution of right is `SinglePartition` | ||
self.right.execute(0, context)? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct here ? For FullOut join, both the left input and right input are single partition now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if the join type is Full Join
, the left and right is single partition.
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
The parameter of partition
must be 0
.
The parameter of partition
is triggered by left side, if the left side is multi partition and have 4 partition,
the value of partition
will be 0
,1
,2
,3
.
The implementation is right, but from your comments, I find it will make reviewer confused. I will add a new method to check if right is single partition.
let left_is_single_partition = self.is_single_partition_for_left(); | ||
// left side | ||
let left_fut = if left_is_single_partition { | ||
self.left_fut.once(|| { | ||
// just one partition for the left side, and the first partition is all of data for left | ||
load_left_specified_partition(0, self.left.clone(), context.clone()) | ||
}) | ||
} else { | ||
// the distribution of left is not single partition, just need the specified partition for left | ||
OnceFut::new(load_left_specified_partition( | ||
partition, | ||
self.left.clone(), | ||
context.clone(), | ||
)) | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about how to design for different kind partition.
I need to investigate about this.
In the implementation of Join, this is join type in current codebase:
In the logical phase, we split the join to I think we can fill a new issue to discuss it. I prefer merge cc @Dandandan |
I think CrossJoin is still required as a physical join plan implementation, because for Cross Join we can have different Partitioning/Shuffle strategies(M * N). Of course we can merge all the logic to NestedLoopJoin, but this will make the NestedLoopJoin's partitioning logic very complex(which is already quite complex). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @liukun4515 .
will merge this pr tomorrow, if there are no other comments. cc @mingmwang @alamb |
Merging this one in so it doesn't accumulate conflicts. Thanks for all the work @liukun4515, @mingmwang and @jackwener |
Benchmark runs are scheduled for baseline = 2792113 and contender = fddb3d3. fddb3d3 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4363
fix bug: join plan with nonequal condition will be converted to the NLJ instead of the error cross join
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?