Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule dynamic filtering collecting task immediately #10868

Merged
merged 2 commits into from
Feb 3, 2022

Conversation

sopel39
Copy link
Member

@sopel39 sopel39 commented Jan 31, 2022

In case of plan
J1
/
J2 S3
/
S1 S2

It might happen that that dynamic filtering dependencies are:
S3 => S3 => S1

With phased scheduler source stage consisting of
(J1, J2, S1) won't be scheduler until stages with S3 and S2
have finished split enumaration. However, it might happen
that S2 is waiting for dynamic filters for S3. In that case,
S3 will never complete because DFs for S3 are collected in
stage (J1, J2, S1).

This commit makes scheduling of DF collecting task immediately
which will prevent queries from deadlock.

@sopel39 sopel39 requested review from dain and raunaqmorarka January 31, 2022 20:24
@cla-bot cla-bot bot added the cla-signed label Jan 31, 2022
@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch 11 times, most recently from 9da1326 to 697c6a1 Compare February 1, 2022 10:03
Copy link
Member

@dain dain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler stuff and the tests look good to me. I don't understand the implications of the change to DynamicFilterService.

@sopel39
Copy link
Member Author

sopel39 commented Feb 1, 2022

ac @raunaqmorarka @dain . I've narrowed down when phased scheduler won't start join stage immediately to only non-fixed source stages. I've also improved tests with regards to source stage partitioning

@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch 4 times, most recently from d4b6e8a to 6dc46de Compare February 1, 2022 21:47
Copy link
Member

@dain dain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Copy link
Contributor

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't feel like I really understand the problem. Added some questions.

It would be great if you could extract refactoring and improvements that are unrelated to have a commit that only does what's needed to address the problem.

Additionally if you could elaborate more on a problem in the commit message it would be great. Maybe you can provide an example of a query that triggers a deadlock with a description of what join distributions are, where are the stage boundaries and how dynamic filters interact with each other.

@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch 3 times, most recently from 333e82e to d50a4c8 Compare February 2, 2022 11:18
@sopel39
Copy link
Member Author

sopel39 commented Feb 2, 2022

@arhimondr I've simplified this PR (removed refactor), keeping just needed parts. i cannot answer some outdated comments

This changes the logic from If there's any dynamic filters produced and consumed within a stage to If there's any dynamic filters produced by a stage. Am i correct? Could you please explain why is it necessary? Is it needed to cover a mix of broadcast and partitioned joins?

I've changed to logic to create collecting task if there is any lazy DF produced by stage. This is needed because there might be consumers of that lazy DF outside of stage.

From what I understand it is to prevent a non source distributed stage running a replicated join from being executed lazily. Is it correct? I wonder why is it necessary.

There won't be extra collecting task for partitioned stages. Hence, if there are any lazy DFs produced by that stage, then stage needs to be scheduled without delay

@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch from d50a4c8 to 7ca7808 Compare February 2, 2022 11:27
@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch 2 times, most recently from 3655c3b to 30c9834 Compare February 2, 2022 23:28
@sopel39 sopel39 requested a review from arhimondr February 2, 2022 23:30
@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch from 30c9834 to 0901a25 Compare February 2, 2022 23:34
In case of plan
        J1
       /  \
     J2    S3
    /  \
   S1  S2

It might happen that dynamic filtering evaluation order is:
S3 => S2 => S1

With phased scheduler source stage consisting of
(J1, J2, S1) won't be scheduled until stages running S3 and S2
have finished split enumaration. However, it might happen
that S2 is waiting for dynamic filters produced for S3.
In that case, S2 will never complete because DFs for S3
are collected in stage (J1, J2, S1) which won't be scheduled
until all S2 split are enumerated.

This commit makes scheduling of DF collecting task immediately
which will prevent queries from deadlock.
@sopel39 sopel39 force-pushed the ks/df_schedule_imm branch from 0901a25 to bce823e Compare February 2, 2022 23:56
@sopel39 sopel39 merged commit 8367998 into trinodb:master Feb 3, 2022
@sopel39 sopel39 deleted the ks/df_schedule_imm branch February 3, 2022 06:35
@sopel39 sopel39 mentioned this pull request Feb 3, 2022
@github-actions github-actions bot added this to the 370 milestone Feb 3, 2022
@sopel39
Copy link
Member Author

sopel39 commented Feb 3, 2022

Thanks for review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

5 participants