-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schedule dynamic filtering collecting task immediately #10868
Conversation
9da1326
to
697c6a1
Compare
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
697c6a1
to
30783f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler stuff and the tests look good to me. I don't understand the implications of the change to DynamicFilterService
.
...no-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDynamicPartitionPruningTest.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
30783f4
to
fa077da
Compare
fa077da
to
79eb0ef
Compare
ac @raunaqmorarka @dain . I've narrowed down when phased scheduler won't start join stage immediately to only non-fixed source stages. I've also improved tests with regards to source stage partitioning |
d4b6e8a
to
6dc46de
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't feel like I really understand the problem. Added some questions.
It would be great if you could extract refactoring and improvements that are unrelated to have a commit that only does what's needed to address the problem.
Additionally if you could elaborate more on a problem in the commit message it would be great. Maybe you can provide an example of a query that triggers a deadlock with a description of what join distributions are, where are the stage boundaries and how dynamic filters interact with each other.
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionSchedule.java
Outdated
Show resolved
Hide resolved
333e82e
to
d50a4c8
Compare
@arhimondr I've simplified this PR (removed refactor), keeping just needed parts. i cannot answer some outdated comments
I've changed to logic to create collecting task if there is any lazy DF produced by stage. This is needed because there might be consumers of that lazy DF outside of stage.
There won't be extra collecting task for partitioned stages. Hence, if there are any lazy DFs produced by that stage, then stage needs to be scheduled without delay |
d50a4c8
to
7ca7808
Compare
3655c3b
to
30c9834
Compare
30c9834
to
0901a25
Compare
core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/StageScheduler.java
Show resolved
Hide resolved
In case of plan J1 / \ J2 S3 / \ S1 S2 It might happen that dynamic filtering evaluation order is: S3 => S2 => S1 With phased scheduler source stage consisting of (J1, J2, S1) won't be scheduled until stages running S3 and S2 have finished split enumaration. However, it might happen that S2 is waiting for dynamic filters produced for S3. In that case, S2 will never complete because DFs for S3 are collected in stage (J1, J2, S1) which won't be scheduled until all S2 split are enumerated. This commit makes scheduling of DF collecting task immediately which will prevent queries from deadlock.
0901a25
to
bce823e
Compare
Thanks for review |
In case of plan
J1
/
J2 S3
/
S1 S2
It might happen that that dynamic filtering dependencies are:
S3 => S3 => S1
With phased scheduler source stage consisting of
(J1, J2, S1) won't be scheduler until stages with S3 and S2
have finished split enumaration. However, it might happen
that S2 is waiting for dynamic filters for S3. In that case,
S3 will never complete because DFs for S3 are collected in
stage (J1, J2, S1).
This commit makes scheduling of DF collecting task immediately
which will prevent queries from deadlock.