Skip to content

Commit

Permalink
Merge pull request #721 from pangeo-forge/append-mode
Browse files Browse the repository at this point in the history
Append mode for StoreToZarr
  • Loading branch information
cisaacstern authored Apr 3, 2024
2 parents fe03650 + c75a274 commit 30b9bb2
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 18 deletions.
15 changes: 15 additions & 0 deletions docs/composition/styles.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ If using the {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordin
```

```{note}
{class}`pangeo_forge_recipes.transforms.StoreToZarr` supports appending to existing Zarr stores
via the optional `append_dim` keyword argument. This option functions nearly identically to the
`append_dim` kwarg in
[`xarray.Dataset.to_zarr`](https://docs.xarray.dev/en/latest/generated/xarray.Dataset.to_zarr.html);
the two differences with this method are that Pangeo Forge will automatically introspect the inputs in
your {class}`FilePattern <pangeo_forge_recipes.patterns.FilePattern>` to determine how the existing Zarr
store dimensions need to be resized, and that writes are parallelized via Apache Beam. Apart from
ensuring that the named `append_dim` already exists in the dataset to which you are appending, use of
this option does not ensure logical consistency (e.g. contiguousness, etc.) of the appended data. When
selecting this option, it is therefore up to you, the user, to ensure that the inputs provided in the
{doc}`file pattern <file_patterns>` for the appending recipe are limited to those which you want to
append.
```


## Open with Kerchunk, write to virtual Zarr

Expand Down
10 changes: 8 additions & 2 deletions pangeo_forge_recipes/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,19 @@ def schema_to_zarr(
attrs: Optional[Dict[str, str]] = None,
consolidated_metadata: Optional[bool] = True,
encoding: Optional[Dict] = None,
append_dim: Optional[str] = None,
) -> zarr.storage.FSStore:
"""Initialize a zarr group based on a schema."""
if append_dim:
# if appending, only keep schema for coordinate to append. if we don't drop other
# coords, we may end up overwriting existing data on the `ds.to_zarr` call below.
schema["coords"] = {k: v for k, v in schema["coords"].items() if k == append_dim}
ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs)
# using mode="w" makes this function idempotent
# using mode="w" makes this function idempotent when not appending
ds.to_zarr(
target_store,
mode="w",
append_dim=append_dim,
mode=("a" if append_dim else "w"),
compute=False,
consolidated=consolidated_metadata,
encoding=encoding,
Expand Down
9 changes: 7 additions & 2 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,23 @@ class MergeDim(CombineDim):
operation: ClassVar[CombineOp] = CombineOp.MERGE


def augment_index_with_start_stop(position: Position, item_lens: List[int]) -> IndexedPosition:
def augment_index_with_start_stop(
position: Position,
item_lens: List[int],
append_offset: int = 0,
) -> IndexedPosition:
"""Take an index _without_ start / stop and add them based on the lens defined in sequence_lens.
:param index: The ``DimIndex`` instance to augment.
:param item_lens: A list of integer lengths for all items in the sequence.
:param append_offset: If appending, the length of the existing ``append_dim``.
"""

if position.indexed:
raise ValueError("This position is already indexed")
start = sum(item_lens[: position.value])
dimsize = sum(item_lens)
return IndexedPosition(start, dimsize=dimsize)
return IndexedPosition(start + append_offset, dimsize=dimsize + append_offset)


class AutoName(Enum):
Expand Down
33 changes: 28 additions & 5 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,26 @@ class IndexItems(beam.PTransform):
"""Augment dataset indexes with information about start and stop position."""

schema: beam.PCollection
append_offset: int = 0

@staticmethod
def index_item(item: Indexed[T], schema: XarraySchema) -> Indexed[T]:
def index_item(item: Indexed[T], schema: XarraySchema, append_offset: int) -> Indexed[T]:
index, ds = item
new_index = Index()
for dimkey, dimval in index.items():
if dimkey.operation == CombineOp.CONCAT:
item_len_dict = schema["chunks"][dimkey.name]
item_lens = [item_len_dict[n] for n in range(len(item_len_dict))]
dimval = augment_index_with_start_stop(dimval, item_lens)
dimval = augment_index_with_start_stop(dimval, item_lens, append_offset)
new_index[dimkey] = dimval
return new_index, ds

def expand(self, pcoll: beam.PCollection):
return pcoll | beam.Map(self.index_item, schema=beam.pvalue.AsSingleton(self.schema))
return pcoll | beam.Map(
self.index_item,
schema=beam.pvalue.AsSingleton(self.schema),
append_offset=self.append_offset,
)


@dataclass
Expand All @@ -341,13 +346,15 @@ class PrepareZarrTarget(beam.PTransform):
then falling out of sync with coordinates if
ConsolidateDimensionCoordinates() is applied to the output of
StoreToZarr().
:param append_dim: Optional name of the dimension to append to.
"""

target: str | FSSpecTarget
target_chunks: Dict[str, int] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
consolidated_metadata: Optional[bool] = True
encoding: Optional[dict] = field(default_factory=dict)
append_dim: Optional[str] = None

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
if isinstance(self.target, str):
Expand All @@ -362,6 +369,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
attrs=self.attrs,
encoding=self.encoding,
consolidated_metadata=False,
append_dim=self.append_dim,
)
return initialized_target

Expand Down Expand Up @@ -641,8 +649,8 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
out https://github.com/jbusecke/dynamic_chunks
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
:param attrs: Extra group-level attributes to inject into the dataset.
:param encoding: Dictionary encoding for xarray.to_zarr().
:param append_dim: Optional name of the dimension to append to.
"""

# TODO: make it so we don't have to explicitly specify combine_dims
Expand All @@ -657,17 +665,31 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
encoding: Optional[dict] = field(default_factory=dict)
append_dim: Optional[str] = None

def __post_init__(self):
if self.target_chunks and self.dynamic_chunking_fn:
raise ValueError("Passing both `target_chunks` and `dynamic_chunking_fn` not allowed.")

self._append_offset = 0
if self.append_dim:
logger.warn(
"When `append_dim` is given, StoreToZarr is NOT idempotent. Successive deployment "
"with the same inputs will append duplicate data to the existing store."
)
dim = [d for d in self.combine_dims if d.name == self.append_dim]
assert dim, f"Append dim not in {self.combine_dims=}."
assert dim[0].operation == CombineOp.CONCAT, "Append dim operation must be CONCAT."
existing_ds = xr.open_dataset(self.get_full_target().get_mapper(), engine="zarr")
assert self.append_dim in existing_ds, "Append dim must be in existing dataset."
self._append_offset = len(existing_ds[self.append_dim])

def expand(
self,
datasets: beam.PCollection[Tuple[Index, xr.Dataset]],
) -> beam.PCollection[zarr.storage.FSStore]:
schema = datasets | DetermineSchema(combine_dims=self.combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
indexed_datasets = datasets | IndexItems(schema=schema, append_offset=self._append_offset)
target_chunks = (
self.target_chunks
if not self.dynamic_chunking_fn
Expand All @@ -684,6 +706,7 @@ def expand(
target_chunks=target_chunks,
attrs=self.attrs,
encoding=self.encoding,
append_dim=self.append_dim,
)
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
singleton_target_store = (
Expand Down
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ def daily_xarray_dataset():
return make_ds(nt=10)


@pytest.fixture(scope="session")
def daily_xarray_datasets_to_append():
return make_ds(nt=10, start="2010-01-01"), make_ds(nt=10, start="2010-01-11")


@pytest.fixture(scope="session")
def daily_xarray_dataset_with_coordinateless_dimension(daily_xarray_dataset):
"""
Expand All @@ -295,6 +300,23 @@ def netcdf_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory):
)


@pytest.fixture(scope="session")
def netcdf_local_paths_sequential_1d_to_append(
daily_xarray_datasets_to_append,
tmpdir_factory,
):
return [
make_local_paths(
ds,
tmpdir_factory,
"D",
split_up_files_by_day,
file_type="netcdf4",
)
for ds in daily_xarray_datasets_to_append
]


@pytest.fixture(scope="session")
def netcdf3_local_paths_sequential_1d(daily_xarray_dataset, tmpdir_factory):
return make_local_paths(
Expand Down Expand Up @@ -448,6 +470,11 @@ def netcdf_local_paths_sequential_with_coordinateless_dimension(
# FilePattern fixtures ----------------------------------------------------------------------------


@pytest.fixture(scope="session")
def netcdf_local_file_patterns_to_append(netcdf_local_paths_sequential_1d_to_append):
return [make_file_pattern(paths) for paths in netcdf_local_paths_sequential_1d_to_append]


@pytest.fixture(scope="session")
def netcdf_local_file_pattern_sequential(netcdf_local_paths_sequential):
return make_file_pattern(netcdf_local_paths_sequential)
Expand Down
4 changes: 2 additions & 2 deletions tests/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import xarray as xr


def make_ds(nt=10, non_dim_coords=False):
def make_ds(nt=10, non_dim_coords=False, start="2010-01-01"):
"""Return a synthetic random xarray dataset."""
np.random.seed(2)
# TODO: change nt to 11 in order to catch the edge case where
# items_per_input does not evenly divide the length of the sequence dimension
ny, nx = 18, 36
time = pd.date_range(start="2010-01-01", periods=nt, freq="D")
time = pd.date_range(start=start, periods=nt, freq="D")
lon = (np.arange(nx) + 0.5) * 360 / nx
lon_attrs = {"units": "degrees_east", "long_name": "longitude"}
lat = (np.arange(ny) + 0.5) * 180 / ny
Expand Down
40 changes: 40 additions & 0 deletions tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
dataset_to_schema,
determine_target_chunks,
schema_to_template_ds,
schema_to_zarr,
)
from pangeo_forge_recipes.storage import FSSpecTarget

from .data_generation import make_ds

Expand Down Expand Up @@ -190,3 +192,41 @@ def test_concat_accumulator():
assert (
merge_accumulator.schema["data_vars"]["bar"] == merge_accumulator.schema["data_vars"]["BAR"]
)


def test_schema_to_zarr(daily_xarray_dataset: xr.Dataset, tmp_target: FSSpecTarget):
target_store = tmp_target.get_mapper()
schema = dataset_to_schema(daily_xarray_dataset)
schema_to_zarr(
schema=schema,
target_store=target_store,
target_chunks={},
attrs={},
consolidated_metadata=False,
encoding=None,
append_dim=None,
)
ds = xr.open_dataset(target_store, engine="zarr")
assert len(ds.time) == len(daily_xarray_dataset.time)
assert len(ds.lon) == len(daily_xarray_dataset.lon)
assert len(ds.lat) == len(daily_xarray_dataset.lat)


def test_schema_to_zarr_append_mode(
daily_xarray_datasets_to_append: tuple[xr.Dataset, xr.Dataset],
tmp_target: FSSpecTarget,
):
"""Tests dimension resizing for append."""

ds0, ds1 = daily_xarray_datasets_to_append
target_store = tmp_target.get_mapper()

schema_ds0 = dataset_to_schema(ds0)
schema_to_zarr(schema=schema_ds0, append_dim=None, target_store=target_store)
ds0_zarr = xr.open_dataset(target_store, engine="zarr")
assert len(ds0_zarr.time) == len(ds0.time)

schema_ds1 = dataset_to_schema(ds1)
schema_to_zarr(schema=schema_ds1, append_dim="time", target_store=target_store)
appended_zarr = xr.open_dataset(target_store, engine="zarr")
assert len(appended_zarr.time) == len(ds0.time) + len(ds1.time)
50 changes: 50 additions & 0 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,56 @@ def test_xarray_zarr_subpath(
xr.testing.assert_equal(ds.load(), daily_xarray_dataset)


def test_xarray_zarr_append(
daily_xarray_datasets_to_append,
netcdf_local_file_patterns_to_append,
tmp_target,
):
ds0_fixture, ds1_fixture = daily_xarray_datasets_to_append
pattern0, pattern1 = netcdf_local_file_patterns_to_append
assert pattern0.combine_dim_keys == pattern1.combine_dim_keys

# these kws are reused across both initial and append pipelines
common_kws = dict(
target_root=tmp_target,
store_name="store",
combine_dims=pattern0.combine_dim_keys,
)
store_path = os.path.join(tmp_target.root_path, "store")
# build an initial zarr store, to which we will append
options = PipelineOptions(runtime_type_check=False)
# we run two pipelines in this test, so instantiate them separately to
# avoid any potential of strange co-mingling between the same pipeline
with TestPipeline(options=options) as p0:
(
p0
| "CreateInitial" >> beam.Create(pattern0.items())
| "OpenInitial" >> OpenWithXarray()
| "StoreInitial" >> StoreToZarr(**common_kws)
)

# make sure the initial zarr store looks good
initial_actual = xr.open_dataset(store_path, engine="zarr")
assert len(initial_actual.time) == 10
xr.testing.assert_equal(initial_actual.load(), ds0_fixture)

# now append to it. the two differences here are
# passing `pattern1` in `Create` and `append_dim="time"` in `StoreToZarr`
with TestPipeline(options=options) as p1:
(
p1
| "CreateAppend" >> beam.Create(pattern1.items())
| "OpenAppend" >> OpenWithXarray()
| "StoreAppend" >> StoreToZarr(append_dim="time", **common_kws)
)

# now see if we have appended to time dimension as intended
append_actual = xr.open_dataset(store_path, engine="zarr")
assert len(append_actual.time) == 20
append_expected = xr.concat([ds0_fixture, ds1_fixture], dim="time")
xr.testing.assert_equal(append_actual.load(), append_expected)


@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"])
def test_reference_netcdf(
daily_xarray_dataset,
Expand Down
7 changes: 4 additions & 3 deletions tests/test_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ def test_setting_file_types(file_type_value):
"position,start",
[(0, 0), (1, 2), (2, 4), (3, 7), (4, 9)],
)
def test_augment_index_with_start_stop(position, start):
@pytest.mark.parametrize("append_offset", [0, 5, 500])
def test_augment_index_with_start_stop(position, start, append_offset):
dk = Position(position)
expected = IndexedPosition(start, dimsize=11)
actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2])
expected = IndexedPosition(start + append_offset, dimsize=11 + append_offset)
actual = augment_index_with_start_stop(dk, [2, 2, 3, 2, 2], append_offset)
assert actual == expected
Loading

0 comments on commit 30b9bb2

Please sign in to comment.