Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent operator fusion for every stage of the Beam executor. #240

Merged
merged 2 commits into from
Nov 29, 2021

Conversation

alxmrs
Copy link
Contributor

@alxmrs alxmrs commented Nov 18, 2021

In an end-to-end experiment, I noticed that stages were still being fused. This caused errors to occur, namely that the Zarr metadata file was not created before one of the later steps (I think store_chunk) that needed it.

By adding a beam.Reshuffle() step to the end of every stage, we ensure that fusion doesn't occur and that stages do execute serially.

In an end-to-end experiment, I noticed that stages were still being fused. This caused errors to occur, namely that the Zarr metadata file was not created before one of the later steps (I think `store_chunk`) that needed it.

By adding a `beam.Reshuffle()` step to the end of every stage, we ensure that fusion doesn't occur and that stages do execute serially.
@rabernat
Copy link
Contributor

Thanks a lot for this PR Alex! 🎉

In an end-to-end experiment, I noticed that stages were still being fused.

It's great you caught this. It also exposes the fact that the Beam executor is not fully covered by the tests. Over in #238, I have been refactoring the test suite so that all of the recipe integration tests will get run with all the executors, eventually including beam. I'd like to finish that first and see if it catches this same bug.

If so, we can merge #238 first and then you you can rebase this, thereby showing that it fixes the bug.

If not, we will need to add a test for this.

So stand by for a few days.

@rabernat
Copy link
Contributor

It looks like #238 did NOT catch the bug related to task ordering and fusion with the Beam executor.

Therefore it would be great to add to or modify the pipeline tests in such a way that we defend against this potential error.

@alxmrs
Copy link
Contributor Author

alxmrs commented Nov 21, 2021

Therefore it would be great to add to or modify the pipeline tests in such a way that we defend against this potential error.

I've been investigating ways to do this. Fusion is an implementation details of the Beam Runner (type of executor). The documentation on this subject suggests that the proper way to test these details is to deploy a lo-fi pipeline on GCP within CI. To check if the Dataflow service fuses steps, we can validate JSON returned from a CLI, for example. Is this an acceptable approach to take?

There's a contingency I'm also considering: Updating the assert system in place to break the pipeline if order properties are violated, maybe by using stateful processing.

Adding a comment to explain why the reshuffle is necessary. I also add a link to the relevant documentation. Stages have more specific names (e.g. link "Start").
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants