Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Beam as an executor for recipe pipelines #169

Closed
alxmrs opened this issue Jul 19, 2021 · 7 comments · Fixed by #225
Closed

Support Beam as an executor for recipe pipelines #169

alxmrs opened this issue Jul 19, 2021 · 7 comments · Fixed by #225

Comments

@alxmrs
Copy link
Contributor

alxmrs commented Jul 19, 2021

As discussed in today's meeting, our internal users would really like to execute Recipes on Beam (and Dataflow). What is the best way to support this functionality?

To this end, I mentioned using XArray-Beam within Pangeo in the meeting. @rabernat pointed out that this is probably not the best direction, since we don't necessarily want to introduce a dependency on that project.

@rabernat
Copy link
Contributor

rabernat commented Jul 20, 2021

To this end, I mentioned using XArray-Beam within Pangeo in the meeting. @rabernat pointed out that this is probably not the best direction, since we don't necessarily want to introduce a dependency on that project.

It's not so much that we don't want the dependency. It's more that XArray-Beam contains higher-level abstractions that we are not able to take advantage of inside pangeo_forge_recipes. The execution model is extremely simple, as you can see from the simple serial functional executor:

def to_function(self) -> Callable[[], None]:
"""
Translate the recipe to a Python function for execution.
"""
def pipeline():
# TODO: allow recipes to customize which stages to run
for input_key in self.iter_inputs():
self.cache_input(input_key)
self.prepare_target()
for chunk_key in self.iter_chunks():
self.store_chunk(chunk_key)
self.finalize_target()
return pipeline

The prefect executor is not much more complicated. The only difference is that it can parallelize certain stages (map over a sequence of inputs or chunks).

def to_prefect(self):
"""Compile the recipe to a Prefect.Flow object."""
from prefect import Flow, task, unmapped
# TODO: allow recipes to customize which stages to run
cache_input_task = task(self.cache_input, name="cache_input")
prepare_target_task = task(self.prepare_target, name="prepare_target")
store_chunk_task = task(self.store_chunk, name="store_chunk")
finalize_target_task = task(self.finalize_target, name="finalize_target")
with Flow("pangeo-forge-recipe") as flow:
cache_task = cache_input_task.map(input_key=list(self.iter_inputs()))
upstream_tasks = [cache_task]
prepare_task = prepare_target_task(upstream_tasks=upstream_tasks)
store_task = store_chunk_task.map(
chunk_key=list(self.iter_chunks()), upstream_tasks=[unmapped(prepare_task)],
)
_ = finalize_target_task(upstream_tasks=[store_task])
return flow

The details of what is happening inside those functions are completely hidden to the executor. The execution knows nothing about Xarray, and indeed, we will eventually have recipe classes that don't use Xarray at all.

For a Beam executor, you would just want to replicate this pattern with a to_beam() method, which returns a beam.PTransform for the recipe. The Beam executor for rechunker would be a reasonable template to start with.

cc @shoyer

@alxmrs
Copy link
Contributor Author

alxmrs commented Jul 20, 2021

It's more that XArray-Beam contains higher-level abstractions ... The execution knows nothing about Xarray, and indeed, we will eventually have recipe classes that don't use Xarray at all.

That makes total sense to me. Thank you for the examples.

If no one else is slated to work on this I'm happy to take it on, especially with help from yourself and the community.

@alxmrs
Copy link
Contributor Author

alxmrs commented Jul 21, 2021

As a starting point, I am working on a PR in Rechunker to extend the ParallelPipelines model to the Beam executor (guided by pangeo-data/rechunker#77). How does this sound? If I understand well how the projects are interrelated, this should make implementing to_beam() straightforward.

@rabernat
Copy link
Contributor

rabernat commented Jul 21, 2021

@alxmrs, your question is raising a broader issue that @pangeo-forge/dev-team needs to consider.

I am working on a PR in Rechunker to extend the ParallelPipelines model to the Beam executor... If I understand well how the projects are interrelated, this should make implementing to_beam() straightforward.

As part of a quest to make the executors more efficient, we have been considering deprecating the rechunker / pipelines dependency and directly implementing the execution within the pangeo_forge_recipes.recipes.Base class. That's what you can see here, for example:

def to_prefect(self):
"""Compile the recipe to a Prefect.Flow object."""
from prefect import Flow, task, unmapped
# TODO: allow recipes to customize which stages to run
cache_input_task = task(self.cache_input, name="cache_input")
prepare_target_task = task(self.prepare_target, name="prepare_target")
store_chunk_task = task(self.store_chunk, name="store_chunk")
finalize_target_task = task(self.finalize_target, name="finalize_target")
with Flow("pangeo-forge-recipe") as flow:
cache_task = cache_input_task.map(input_key=list(self.iter_inputs()))
upstream_tasks = [cache_task]
prepare_task = prepare_target_task(upstream_tasks=upstream_tasks)
store_task = store_chunk_task.map(
chunk_key=list(self.iter_chunks()), upstream_tasks=[unmapped(prepare_task)],
)
_ = finalize_target_task(upstream_tasks=[store_task])
return flow

But I'm no longer sure that was a necessary step.

I would be very curious to hear @TomAugspurger's opinion on whether that is still needed after the refactor in #160. Specifically, is there any significant difference between the graph produced by

delayed = recipe.to_dask()

vs

pipelines = recipe.to_pipelines()
from rechunker.executors import DaskPipelineExecutor 
executor = DaskPipelineExecutor(pipelines)
delayed = executor.pipelines_to_plan(pipelines)

???

If there is no significant difference, then I would go ahead as you proposed and add the Beam pipelines executor to Rechunker.

In the meantime, I am going to make a PR that removes our custom executors and goes back to using the rechunker-based ones (but without changing our API). We can discuss the question further there.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Aug 2, 2021

We'll want to separate the API design discussion from the implementation discussion. API-wise, I think it's nice for users to have a .to_<executor>. Ideally, that would be implemented as <Executor>.pipelines_to_plan(self.to_pipelines()).compute().

#160 primarily dropped the detour through pipelines_to_plan() to expedite the implementation. I think that with a bit of effort, possibly involving PRs to both pangeo-forge-recipes and rechunker. Actually, glancing at the final implementation, it's possible that the Dask graph generated by DaskExecutor.pipelines_to_plan(recipe.to_pipelines()) will be (essentially) equivalent to the output of recipe.to_dask().

An easy-ish test is to run a recipe with many inputs using both .to_pipelines() and pipelines_to_plan. IIRC, the OISST recipe showed memory issues on the scheduler pretty quickly.

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 11, 2021

I've been chipping away at this issue today, this time implementing to_beam() in Rechunker (in offline discussion IIRC, this was the recommended way to proceed).

I have an explanation as to why any approach that allows side effects in function closures (which includes the current MultiStagePipeline model in rechunker) won't work for Beam executors.

Here is a naive implementation of a beam executor in rechunker that uses multi-stage pipelines: alxmrs/rechunker@e036131

The Beam executor fails the test suite with the following error:

>   lambda x: dumps(x, protocol), pickle.loads)
E   AttributeError: Can't pickle local object 'example_pipeline.<locals>.func0'

Digging a bit deeper – the states of each stage in the tests are fundamentally unpicklable (see these docs). Beam requires serialization in order to fan out both transformations and collections.

I think we need a new structure if we want to support pangeo-forge execution on Apache Beam.

A naive idea I've started to explore (alxmrs/rechunker@129f4cd) includes creating a "stateful" type of multi-stage pipeline, where recipe writes can include a shared context for each stage.

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 28, 2021

Further serialization discussion in #211

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants