Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NestedLoopJoin will panic when right child contains RepartitionExec #5022

Closed
ygf11 opened this issue Jan 22, 2023 · 10 comments · Fixed by #5156
Closed

NestedLoopJoin will panic when right child contains RepartitionExec #5022

ygf11 opened this issue Jan 22, 2023 · 10 comments · Fixed by #5156
Labels
bug Something isn't working

Comments

@ygf11
Copy link
Contributor

ygf11 commented Jan 22, 2023

Describe the bug
A clear and concise description of what the bug is.
When I work on #4866, I find some NestedLoopJoins will panic.

# create tables -- t.csv is an empty file.
❯ CREATE EXTERNAL TABLE t1 (t1_id INT, t1_name text, t1_int INT) STORED AS CSV LOCATION 't.csv';
❯ CREATE EXTERNAL TABLE t2 (t2_id INT, t2_name text, t2_int INT) STORED AS CSV LOCATION 't.csv';
# target_partitions
> set datafusion.execution.target_partitions = 4;
❯ explain select * from t1 inner join t2 on t1.t1_id > t2.t2_id where t1.t1_id > 10 and t2.t2_int > 10;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                  |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int                                                                                          |
|               |   Inner Join:  Filter: t1.t1_id > t2.t2_id                                                                                                                            |
|               |     Filter: t1.t1_id > Int32(10)                                                                                                                                      |
|               |       TableScan: t1 projection=[t1_id, t1_name, t1_int], partial_filters=[t1.t1_id > Int32(10)]                                                                       |
|               |     Filter: t2.t2_int > Int32(10)                                                                                                                                     |
|               |       TableScan: t2 projection=[t2_id, t2_name, t2_int], partial_filters=[t2.t2_int > Int32(10)]                                                                      |
| physical_plan | ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]                         |
|               |   NestedLoopJoinExec: join_type=Inner, filter=BinaryExpr { left: Column { name: "t1_id", index: 0 }, op: Gt, right: Column { name: "t2_id", index: 1 } }              |
|               |     CoalesceBatchesExec: target_batch_size=8192                                                                                                                       |
|               |       FilterExec: t1_id@0 > 10                                                                                                                                        |
|               |         RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1                                                                                          |
|               |           CsvExec: files={1 group: [[home/work/tools/datafusion-test-data/join-context/t1.csv]]}, has_header=false, limit=None, projection=[t1_id, t1_name, t1_int]   |
|               |     CoalescePartitionsExec                                                                                                                                            |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                     |
|               |         FilterExec: t2_int@2 > 10                                                                                                                                     |
|               |           RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -- panic in RepartitionExec                                                                                          |
|               |             CsvExec: files={1 group: [[home/work/tools/datafusion-test-data/join-context/t1.csv]]}, has_header=false, limit=None, projection=[t2_id, t2_name, t2_int] |
|               |                                                                                                                                                                       |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

This sql will panic:

select * from t1 inner join t2 on t1.t1_id > t2.t2_id where t1.t1_id > 10 and t2.t2_int > 10;
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
thread 'tokio-runtime-worker' panicked at 'partition not used yet', /home/work/arrow-datafusion/datafusion/core/src/physical_plan/repartition/mod.rs:426:14
0 rows in set. Query took 0.015 seconds.

To Reproduce
See above.

Expected behavior
The join should work.

It seems the partition of right child may execute one more time in NestedLoopJoin, if it has multiple output partitions. It is ok in most time, but when meets RepartitionExec it will panic.

Additional context
Add any other context about the problem here.

@liukun4515
Copy link
Contributor

@ygf11 Do you find the root cause why the panic happened?

It seems the partition of right child may execute one more time in NestedLoopJoin, if it has multiple output partitions. It is ok in most time, but when meets RepartitionExec it will panic.

From your example, the left side is partitioned and the right side is required single-partitioned.

The output partition should be consistent with left side.

The right single-partition will be executed one more time, is it not allowed to be called multiple times? Maybe it is the point why it caused panic
@alamb

@ygf11
Copy link
Contributor Author

ygf11 commented Jan 28, 2023

The right single-partition will be executed one more time,

@liukun4515, that is the cause.

We can see in NestedLoopJoinExec:
https://github.com/apache/arrow-datafusion/blob/a3219457d19e3a51c137088f230e79495f6ac550/datafusion/core/src/physical_plan/joins/nested_loop_join.rs#L205-L213

If right_side is a single partition, partition0 will execute one more time.

@liukun4515
Copy link
Contributor

RepartitionExec

Yes, we got the same reason, but I don't know why the RepartitionExec can not be called twice for the same partition.
Other physicalexec have the same behavior? @alamb

@liukun4515
Copy link
Contributor

liukun4515 commented Jan 28, 2023

RepartitionExec

Yes, we got the same reason, but I don't know why the RepartitionExec can not be called twice for the same partition.

Other physicalexec have the same behavior?

@liukun4515
Copy link
Contributor

The Repartition Exec share a global state of

/// Inner state that is initialized when the first output stream is created.
    state: Arc<Mutex<RepartitionExecState>>,

the state will shared between multiple runtime/coroutine context, if one coroutine execute the partition 1, and other coroutine can not execute the partition 1 again.

@ygf11
Copy link
Contributor Author

ygf11 commented Jan 29, 2023

Other physicalexec have the same behavior?

I don't know if there is other physical plan has the same behavior.
but maybe executing the same partition one more time is not a good idea, it will waste a lot of time, a better way is to cache the result(OnceFut), or only call once.

NestedLoopJoinExec will sometimes build-left, and sometimes build-right now, and they are using the same logic, I think a better way is to give build-left a logic to handle join, and give build-right another logic. Then the partitions of the non-build-side can be executed only once.

The bug of example is the case of build-right, I see the NestedLoopJoinStream is using build-left logic.

I think maybe it is an another cause. @liukun4515 @alamb

@ygf11
Copy link
Contributor Author

ygf11 commented Jan 29, 2023

NestedLoopJoinExec will sometimes build-left, and sometimes build-right now, and they are using the same logic, I think a better way is to give build-left a logic to handle join, and give build-right another logic. Then the partitions of the non-build-side can be executed only once.

Let me explain more.

The main difference is the way of iteration.
For build-left, left has a single partition, right has multiple partitions. The iteration will be like:

// Given a partition number - x 
// execute right-partition-x
for batch in right-partition-x {
   join(batch, left-data-(single partition)) 
}

For build-right, left has multiple partitions, right has a single partition. The iteration will should be like:

// Given a partition number - x 
// execute left-partition-x
for batch in left-partition-x {
   join(batch, right-data(single partition)) 
}

If we use build-left logic for build-right(issue-example), then:

// Given a partition number - x
// Execute right-partition-0 --- right partition is a single partition and execute many times
for batch in right-partition-0 {
   join(batch, left-data(partition-x)) 
}

@liukun4515
Copy link
Contributor

Other physicalexec have the same behavior?

I don't know if there is other physical plan has the same behavior. but maybe executing the same partition one more time is not a good idea, it will waste a lot of time, a better way is to cache the result(OnceFut), or only call once.

NestedLoopJoinExec will sometimes build-left, and sometimes build-right now, and they are using the same logic, I think a better way is to give build-left a logic to handle join, and give build-right another logic. Then the partitions of the non-build-side can be executed only once.

The bug of example is the case of build-right, I see the NestedLoopJoinStream is using build-left logic.

I think maybe it is an another cause. @liukun4515 @alamb

I think the build-side and probe-side are the concept for the Hash-Join.

In the NLJ, we need the outer-side/outer table and inner-side/inner table.

  for out-row in outer-table
    for inner-row in inner-table
       check-join

The inner table will be traveled many times in the most basic implementation.

In the current implementation, the left table is the outer table, the right table is the inner table.

For the inner join with multi left partition and single right partition:

for left-partition-x in left multi partition:
   join(load-left(left-partition-x), load-right(single-right-partition))

I think this implementation of iter is matched with the algorithm of the NLJ.

@liukun4515
Copy link
Contributor

I think we can change the function distribution_from_join_type

        JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
            // need the left data, and the right should be one partition
            vec![
                Distribution::UnspecifiedDistribution,
                Distribution::SinglePartition,
            ]
        }

to

JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
            // need the left data, and the right should be one partition
            vec![
                Distribution::SinglePartition,
                Distribution::SinglePartition,
            ]
        }

temporarily If we can not support executing multi times for one physical executor for the same partition.

It will not block your fix for the issue #4866

cc @ygf11

@ygf11
Copy link
Contributor Author

ygf11 commented Feb 2, 2023

Thanks for response @liukun4515. I think I find the way to fix this bug.

In the current implementation, the left table is the outer table, the right table is the inner table.

IMOP, The relationship between inner/outer table and left/right table is not fixed, it is decided by the required distribution(join type).
For right/right-semi/right-anti joins,

  • left is the single partition side.
  • right is the multiple partition side.
  • the output partition count is the same as right table.

The algorithm will be like:

for x in 0..output-partition-count:
   join(right-partition(x), single-left-partition)

we can see left table will be visited one more time, and right table will be visited only once.
That means right is the outer table side, and left is the inner table side in nested loop join(They are different from what you said).

The inner table will be traveled many times in the most basic implementation.

Yes, visiting a relation many time is a common operation.
In datafusion, other physical plans also have similar requirement, like CrossJoinExec, it works well with OnceAsync.

Currently the implementation will always load/cache(OnceAsync) the left table, which is not correct(left table maybe outer table side). We should always load/cache(OnceAsync) the inner table data.

I think the queries will success after that.
I create a new pr #5156 to fix this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants