From 910f740a8cf1cdbc2f23799ac08201896d2ebcd2 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 12 Apr 2022 14:42:41 -0700 Subject: [PATCH] Experiment: How HDFReferenceRecipe would look as a Beam Pipeline. This is a prototype for using Apache Beam for the internal (and external?) data model of Pangeo Forge Recipes. Here, I demo how HDFReferenceRecipe could be structured into modular components via composite Beam transforms. xref: #256 --- examples/cmip6_ref_beam.py | 31 ++++++++++++ pangeo_forge_recipes/patterns.py | 24 +++++++++ .../recipes/reference_hdf_zarr.py | 49 ++++++++++++++++--- 3 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 examples/cmip6_ref_beam.py diff --git a/examples/cmip6_ref_beam.py b/examples/cmip6_ref_beam.py new file mode 100644 index 00000000..d1d609ad --- /dev/null +++ b/examples/cmip6_ref_beam.py @@ -0,0 +1,31 @@ +import typing as t +import s3fs +import sys +import apache_beam as beam + +from pangeo_forge_recipes.patterns import pattern_from_file_sequence +from pangeo_forge_recipes.recipes.reference_hdf_zarr import HDFReferenceRecipe + +BASE_PATH = 's3://esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/' + + +def run(pipeline_args: t.List[str]) -> None: + # Define pattern + fs = s3fs.S3FileSystem(anon=True) + all_paths = fs.ls(BASE_PATH) + pattern = pattern_from_file_sequence(['s3://' + path for path in all_paths], 'time') + + # Create Recipe + rec = HDFReferenceRecipe( + pattern, + xarray_open_kwargs={"decode_coords": "all"}, + netcdf_storage_options={"anon": True} + ) + + with beam.Pipeline(argv=pipeline_args) as p: + p | rec.to_beam() + + +if __name__ == '__main__': + run(sys.argv[1:]) + diff --git a/pangeo_forge_recipes/patterns.py b/pangeo_forge_recipes/patterns.py index 872516aa..2790766f 100644 --- a/pangeo_forge_recipes/patterns.py +++ b/pangeo_forge_recipes/patterns.py @@ -1,6 +1,9 @@ """ Filename / URL patterns. """ +import dataclasses + +import apache_beam as beam import inspect from dataclasses import dataclass, field, replace from enum import Enum @@ -216,6 +219,27 @@ def items(self): yield key, self[key] +@dataclasses.dataclass +class OpenPattern(beam.PTransform): + + file_pattern: FilePattern + + def expand(self, pcoll): + return pcoll | beam.Create(self.file_pattern.items()) + + +@dataclasses.dataclass +class ChunkKeys(beam.PTransform): + def expand(self, pcoll): + return pcoll | beam.MapTuple(lambda key, fname: key) + + +@dataclasses.dataclass +class FileNames(beam.PTransform): + def expand(self, pcoll): + return pcoll | beam.MapTuple(lambda key, fname: fname) + + def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None, **kwargs): """Convenience function for creating a FilePattern from a list of files.""" diff --git a/pangeo_forge_recipes/recipes/reference_hdf_zarr.py b/pangeo_forge_recipes/recipes/reference_hdf_zarr.py index 5ce28496..d4faf8c9 100644 --- a/pangeo_forge_recipes/recipes/reference_hdf_zarr.py +++ b/pangeo_forge_recipes/recipes/reference_hdf_zarr.py @@ -3,17 +3,18 @@ import json import os from dataclasses import dataclass, field -from typing import Hashable, Iterable, Optional +from typing import Hashable, Iterable, Optional, Dict, List +import apache_beam as beam import fsspec import yaml from fsspec_reference_maker.combine import MultiZarrToZarr +from .base import BaseRecipe, FilePatternMixin from ..executors.base import Pipeline, Stage -from ..patterns import Index +from ..patterns import Index, OpenPattern, FileNames from ..reference import create_hdf5_reference, unstrip_protocol from ..storage import FSSpecTarget, MetadataTarget, file_opener -from .base import BaseRecipe, FilePatternMixin ChunkKey = Index @@ -26,18 +27,30 @@ def no_op(*_, **__) -> None: def scan_file(chunk_key: ChunkKey, config: HDFReferenceRecipe): assert config.metadata_cache is not None, "metadata_cache is required" fname = config.file_pattern[chunk_key] + reference = scan_file_pure(fname, config) ref_fname = os.path.basename(fname + ".json") + config.metadata_cache[ref_fname] = reference + + +def scan_file_pure(fname: str, config: HDFReferenceRecipe) -> Dict: with file_opener(fname, **config.netcdf_storage_options) as fp: protocol = getattr(getattr(fp, "fs", None), "protocol", None) # make mypy happy if protocol is None: raise ValueError("Couldn't determine protocol") target_url = unstrip_protocol(fname, protocol) - config.metadata_cache[ref_fname] = create_hdf5_reference(fp, target_url, fname) + return create_hdf5_reference(fp, url=target_url, fname=fname) def finalize(config: HDFReferenceRecipe): - assert config.target is not None, "target is required" assert config.metadata_cache is not None, "metadata_cache is required" + files = list( + config.metadata_cache.getitems(list(config.metadata_cache.get_mapper())).values() + ) # returns dicts from remote + finalize_pure(files, config) + + +def finalize_pure(files: List[Dict], config: HDFReferenceRecipe) -> None: + assert config.target is not None, "target is required" remote_protocol = fsspec.utils.get_protocol(next(config.file_pattern.items())[1]) concat_args = config.xarray_concat_args.copy() if "dim" in concat_args: @@ -47,9 +60,6 @@ def finalize(config: HDFReferenceRecipe): ) concat_args["dim"] = config.file_pattern.concat_dims[0] # there should only be one - files = list( - config.metadata_cache.getitems(list(config.metadata_cache.get_mapper())).values() - ) # returns dicts from remote if len(files) == 1: out = files[0] else: @@ -100,6 +110,26 @@ def hdf_reference_recipe_compiler(recipe: HDFReferenceRecipe) -> Pipeline: return Pipeline(stages=stages, config=recipe) +@dataclass +class ScanFiles(beam.PTransform): + config: BaseRecipe + + def expand(self, pcoll): + return pcoll | beam.Map(scan_file_pure, config=self.config) + + +@dataclass +class WriteZarrReference(beam.PTransform): + config: BaseRecipe + + def expand(self, pcoll): + return ( + pcoll + | beam.combiners.ToList() + | beam.Map(finalize_pure, config=self.config) + ) + + @dataclass class HDFReferenceRecipe(BaseRecipe, FilePatternMixin): """ @@ -168,3 +198,6 @@ def _validate_file_pattern(self): def iter_inputs(self) -> Iterable[Hashable]: yield from self.file_pattern + + def to_beam(self): + return OpenPattern(self.file_pattern) | FileNames() | ScanFiles(self) | WriteZarrReference(self)