Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Beam Executor #225

Merged
merged 19 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b63f440
Naive implementation of Beam executor.
alxmrs Nov 7, 2021
81aa14e
An improvement on the Beam executor; includes parallel mappable stages.
alxmrs Nov 7, 2021
c562e0f
Adding beam dependencies to CI conda environment files.
alxmrs Nov 7, 2021
c8ffecc
Updated change to pass precommit checks. Includes some refactoring fo…
alxmrs Nov 8, 2021
7d5a7b3
Update ci/py3.8.yml
rabernat Nov 8, 2021
3d693eb
Chose better variable name that didn't break Beam's type-checking. Ap…
alxmrs Nov 8, 2021
bbdb8b4
Merge remote-tracking branch 'origin/beam' into beam
alxmrs Nov 8, 2021
c7198b8
Update ci/py3.9.yml
alxmrs Nov 8, 2021
0026701
Merge branch 'master' of github.com:pangeo-forge/pangeo-forge-recipes…
alxmrs Nov 11, 2021
a4dafc5
Merge branch 'beam' of github.com:alxmrs/pangeo-forge-recipes into beam
alxmrs Nov 11, 2021
d9c5b2b
Remove apache-beam from py3.9 env (not supported).
alxmrs Nov 11, 2021
e3423db
Skip Beam pipeline tests if apache_beam is not installed in the test …
alxmrs Nov 11, 2021
e9f5077
Merge branch 'master' of github.com:pangeo-forge/pangeo-forge-recipes…
alxmrs Nov 11, 2021
97a60a5
Added reshuffle step to prevent operator fusion.
alxmrs Nov 11, 2021
c0c3d5a
Pipeline tests restructured to skip tests for Executors that have imp…
alxmrs Nov 11, 2021
a871809
Added `to_beam()` to base recipe.
alxmrs Nov 13, 2021
ca8afbb
Documented usage of `to_beam()`.
alxmrs Nov 14, 2021
27467d1
Better Executor fixture in tests: using import_module instead of impo…
alxmrs Nov 16, 2021
f52fece
Update release_notes.md
rabernat Nov 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/py3.8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
dependencies:
- python=3.8
- aiohttp
- apache-beam
- black
- boto3
- cfgrib<0.9.9.0
Expand Down
16 changes: 16 additions & 0 deletions docs/recipe_user_guide/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,19 @@ flow.run()
```

By default the flow is run using Prefect's [LocalExecutor](https://docs.prefect.io/orchestration/flow_config/executors.html#localexecutor). See [executors](https://docs.prefect.io/orchestration/flow_config/executors.html) for more.

### Beam PTransform

You can convert your recipe to an Apache Beam [PTransform](https://beam.apache.org/documentation/programming-guide/#transforms)
to be used within a [Pipeline](https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline) using the
:meth:`BaseRecipe.to_beam()` method. For example

```{code-block} python
import apache_beam as beam

with beam.Pipeline() as p:
p | recipe.to_beam()
```

By default the pipeline runs using Beam's [DirectRunner](https://beam.apache.org/documentation/runners/direct/).
See [runners](https://beam.apache.org/documentation/#runners) for more.
80 changes: 80 additions & 0 deletions pangeo_forge_recipes/executors/beam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import itertools
from dataclasses import dataclass
from typing import Any, Iterable, List, Tuple, cast

import apache_beam as beam

from .base import Config, NoArgumentStageFunction, Pipeline, PipelineExecutor, Stage


def _no_arg_stage(last: int, *, current: int, fun: NoArgumentStageFunction, config: Config) -> int:
"""Execute a NoArgumentStageFunction, ensuring execution order."""
assert (last + 1) == current, f"stages are executing out of order! On step {current!r}."
alxmrs marked this conversation as resolved.
Show resolved Hide resolved

fun(config=config)

return current


def _no_op(arg, config=None) -> None:
pass


@dataclass()
class _SingleArgumentStage(beam.PTransform):
"""Execute mappable stage in parallel."""

step: int
stage: Stage
config: Config

def prepare_stage(self, last: int) -> Iterable[Tuple[int, Any]]:
"""Propagate current stage to Mappables for parallel execution."""
assert (last + 1) == self.step, f"stages are executing out of order! On step {self.step!r}."
return zip(itertools.repeat(self.step), cast(Iterable, self.stage.mappable))

def exec_stage(self, last: int, arg: Any) -> int:
"""Execute stage function."""
assert last == self.step, f"stages are executing out of order! On step {self.step!r}."

self.stage.function(arg, config=self.config) # type: ignore

return self.step

def post_validate(self, last: List[int]) -> int:
"""Propagate step number for downstream stage validation."""
in_current_step = all((it == self.step for it in last))
assert in_current_step, f"stages are executing out of order! On step {self.step!r}."

return self.step

def expand(self, pcoll):
return (
pcoll
| "Prepare" >> beam.FlatMap(self.prepare_stage)
| beam.Reshuffle()
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
| "Execute" >> beam.MapTuple(self.exec_stage)
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
| beam.combiners.ToList()
| "Validate" >> beam.Map(self.post_validate)
)


class BeamPipelineExecutor(PipelineExecutor[beam.PTransform]):
@staticmethod
def compile(pipeline: Pipeline) -> beam.PTransform:
pcoll = beam.Create([-1])
for step, stage in enumerate(pipeline.stages):
if stage.mappable is not None:
pcoll |= stage.name >> _SingleArgumentStage(step, stage, pipeline.config)
else:
pcoll |= stage.name >> beam.Map(
_no_arg_stage, current=step, fun=stage.function, config=pipeline.config
)

return pcoll

@staticmethod
def execute(plan: beam.PTransform, *args, **kwargs):
"""Execute a plan. All args and kwargs are passed to a `apache_beam.Pipeline`."""
with beam.Pipeline(*args, **kwargs) as p:
p | plan
5 changes: 5 additions & 0 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ def to_prefect(self):

return PrefectPipelineExecutor.compile(self._compiler())

def to_beam(self):
from pangeo_forge_recipes.executors.beam import BeamPipelineExecutor

return BeamPipelineExecutor.compile(self._compiler())


RecipeCompiler = Callable[[BaseRecipe], Pipeline]

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ max-line-length = 100

[isort]
known_first_party=pangeo_forge_recipes
known_third_party=aiohttp,click,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
known_third_party=aiohttp,apache_beam,click,dask,fsspec,fsspec_reference_maker,mypy_extensions,numpy,pandas,prefect,pytest,pytest_lazyfixture,setuptools,xarray,yaml,zarr
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand Down
23 changes: 17 additions & 6 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
from pytest_lazyfixture import lazy_fixture

from pangeo_forge_recipes.executors.base import Pipeline, Stage
from pangeo_forge_recipes.executors.dask import DaskPipelineExecutor
from pangeo_forge_recipes.executors.function import FunctionPipelineExecutor
from pangeo_forge_recipes.executors.prefect import PrefectPipelineExecutor


@pytest.fixture
Expand Down Expand Up @@ -50,9 +47,23 @@ def func1(arg, config=None):
return pipeline, config, tmp


@pytest.mark.parametrize(
"Executor", [FunctionPipelineExecutor, PrefectPipelineExecutor, DaskPipelineExecutor],
@pytest.fixture(
scope="session",
params=[
("pangeo_forge_recipes.executors.dask", "DaskPipelineExecutor"),
("pangeo_forge_recipes.executors.function", "FunctionPipelineExecutor"),
("pangeo_forge_recipes.executors.prefect", "PrefectPipelineExecutor"),
("pangeo_forge_recipes.executors.beam", "BeamPipelineExecutor"),
],
)
def Executor(request):
try:
module = pytest.importorskip(request.param[0])
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
return getattr(module, request.param[1])
except (AttributeError, ImportError):
pytest.skip()


@pytest.mark.parametrize(
"pipeline_config_tmpdir",
[lazy_fixture("pipeline_no_config"), lazy_fixture("pipeline_with_config")],
Expand All @@ -69,4 +80,4 @@ def test_pipeline(pipeline_config_tmpdir, Executor):
f"{prefix}func1_b.log",
f"{prefix}func1_3.log",
]:
assert tmpdir.join(fname).check(file=True)
assert tmpdir.join(fname).check(file=True), f"File not found in temp directory: {fname}."