-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Beam as an executor for recipe pipelines #169
Comments
It's not so much that we don't want the dependency. It's more that XArray-Beam contains higher-level abstractions that we are not able to take advantage of inside pangeo_forge_recipes. The execution model is extremely simple, as you can see from the simple serial functional executor: pangeo-forge-recipes/pangeo_forge_recipes/recipes/base.py Lines 97 to 111 in 6fb5a0a
The prefect executor is not much more complicated. The only difference is that it can parallelize certain stages ( pangeo-forge-recipes/pangeo_forge_recipes/recipes/base.py Lines 156 to 175 in 6fb5a0a
The details of what is happening inside those functions are completely hidden to the executor. The execution knows nothing about Xarray, and indeed, we will eventually have recipe classes that don't use Xarray at all. For a Beam executor, you would just want to replicate this pattern with a cc @shoyer |
That makes total sense to me. Thank you for the examples. If no one else is slated to work on this I'm happy to take it on, especially with help from yourself and the community. |
As a starting point, I am working on a PR in Rechunker to extend the |
@alxmrs, your question is raising a broader issue that @pangeo-forge/dev-team needs to consider.
As part of a quest to make the executors more efficient, we have been considering deprecating the rechunker / pipelines dependency and directly implementing the execution within the pangeo-forge-recipes/pangeo_forge_recipes/recipes/base.py Lines 156 to 175 in b9f74be
But I'm no longer sure that was a necessary step. I would be very curious to hear @TomAugspurger's opinion on whether that is still needed after the refactor in #160. Specifically, is there any significant difference between the graph produced by delayed = recipe.to_dask() vs pipelines = recipe.to_pipelines()
from rechunker.executors import DaskPipelineExecutor
executor = DaskPipelineExecutor(pipelines)
delayed = executor.pipelines_to_plan(pipelines) ??? If there is no significant difference, then I would go ahead as you proposed and add the Beam pipelines executor to Rechunker. In the meantime, I am going to make a PR that removes our custom executors and goes back to using the rechunker-based ones (but without changing our API). We can discuss the question further there. |
We'll want to separate the API design discussion from the implementation discussion. API-wise, I think it's nice for users to have a #160 primarily dropped the detour through An easy-ish test is to run a recipe with many inputs using both |
I've been chipping away at this issue today, this time implementing I have an explanation as to why any approach that allows side effects in function closures (which includes the current Here is a naive implementation of a beam executor in rechunker that uses multi-stage pipelines: alxmrs/rechunker@e036131 The Beam executor fails the test suite with the following error:
Digging a bit deeper – the states of each stage in the tests are fundamentally unpicklable (see these docs). Beam requires serialization in order to fan out both transformations and collections. I think we need a new structure if we want to support pangeo-forge execution on Apache Beam. A naive idea I've started to explore (alxmrs/rechunker@129f4cd) includes creating a "stateful" type of multi-stage pipeline, where recipe writes can include a shared context for each stage. |
Further serialization discussion in #211 |
As discussed in today's meeting, our internal users would really like to execute Recipes on Beam (and Dataflow). What is the best way to support this functionality?
To this end, I mentioned using XArray-Beam within Pangeo in the meeting. @rabernat pointed out that this is probably not the best direction, since we don't necessarily want to introduce a dependency on that project.
The text was updated successfully, but these errors were encountered: