From 70fcf945971435f0f43e3eee8696d4326ed56d60 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 11 Nov 2024 15:40:51 -0700 Subject: [PATCH] More API sketching --- .../icechunk-python/distributed-writes.md | 87 +++++++++++++++++- icechunk-python/python/icechunk/dask.py | 8 +- icechunk-python/python/icechunk/xarray.py | 89 ++++++++++++++----- 3 files changed, 158 insertions(+), 26 deletions(-) diff --git a/docs/docs/icechunk-python/distributed-writes.md b/docs/docs/icechunk-python/distributed-writes.md index 14f15c336..2ad6c3800 100644 --- a/docs/docs/icechunk-python/distributed-writes.md +++ b/docs/docs/icechunk-python/distributed-writes.md @@ -2,7 +2,7 @@ !!! warning - Using Xarray, Dask, and Icechunk requires `icechunk>=FOO`, `dask>=FOO`, and `xarray>=2024.11.0`. + Using Xarray, Dask, and Icechunk requires `icechunk>=FOO`, `dask>=2024.11.0`, and `xarray>=2024.11.0`. First let's start a distributed Client and create an IcechunkStore. @@ -103,7 +103,8 @@ writer = XarrayDatasetWriter(ds, store=icechunk_store) Write metadata for arrays in one step. This "initializes" the store but has not written an real values yet. ```python -writer.write_metadata(group="new2", mode="w") +writer.for_write(group="new2", mode="w") # default mode="w-"? +writer.write_metadata() ``` Write an in-memory arrays to the store: ```python @@ -114,10 +115,90 @@ and any in-memory arrays with the desired values. Finally execute a write of all lazy arrays. ```python -writer.write_lazy() # eagerly write dask arrays +writer.write_lazy() ``` Finally commit your changes! ```python icechunk_store.commit("wrote an Xarray dataset!") ``` + +## Modifying existing stores + +Confusions re: appends and region writes: +1. Do existing coordinate variables get overwritten? +2. Do existing attributes get overwritten? +3. Can I write new variables? +4. With appends, can I update only a subset of arrays in the group with `append_dim`? + +Proposal: +1. Appends along a dimensions, and region writes are conceptually similar and never allow changing existing array metadata. +2. To add a new variable to the store, or modify existing metadata, use a different entrypoint; `writer.for_modifying`. + +API Qs: +1. `write_metadata`: Is it clear that this *creates* new Zarr arrays, and will *resize* any array + that is being appended to? +2. Do people like writing new variables AND appending along a dimension at the same time? + +### Overwriting the _whole_ store + +```python +writer = XarrayDatasetWriter(ds, store=icechunk_store) +writer.for_create_or_overwrite( + group="new2", # will set mode="w" +) +``` + +### Overwrite, or add new variables +```python +writer = XarrayDatasetWriter(ds, store=icechunk_store) +writer = writer.for_modify( + group="new2", # will set mode="a" +) +# allows writing new vars, updating attributes for existing arrays +writer.write_metadata() +# will overwrite any pre-existing variables +writer.write_eager() +# will overwrite any pre-existing variables +writer.write_lazy() +``` + +### Adding new variables only +```python +writer = XarrayDatasetWriter(ds, store=icechunk_store) +writer = writer.for_modify( + group="new2", # will set mode="a" +) +# modifies the dataset to be written, by dropping any pre-existing vars +writer = writer.drop_existing_vars() +writer.write_metadata() +writer.write_eager() +writer.write_lazy() +``` + + +### Appending along a dimension + +```python +writer = XarrayDatasetWriter(ds, store=icechunk_store) +writer = writer.for_append( + group="new2", append_dim="time" # will set mode="a" +) +# this will resize the appropriate arrays. +# should it overwrite existing metadata? +writer.write_metadata() +# This will write the updated coordinate value for `append_dim` (if any) +writer.write_eager() +# now effectively execute a region write for the appended piece +writer.write_lazy() +``` + +### Writing to a region of existing arrays +```python +writer = XarrayDatasetWriter(ds, store=icechunk_store) +# will raise for any variables not overlapping with region. +writer.for_region(region="auto") +writer.write_metadata() +writer.write_eager() +writer.write_lazy() +``` diff --git a/icechunk-python/python/icechunk/dask.py b/icechunk-python/python/icechunk/dask.py index 16e0a2787..25c889ad3 100644 --- a/icechunk-python/python/icechunk/dask.py +++ b/icechunk-python/python/icechunk/dask.py @@ -12,6 +12,7 @@ overload, ) +import dask import dask.array import zarr from dask import config @@ -21,11 +22,14 @@ from dask.delayed import Delayed from dask.highlevelgraph import HighLevelGraph from icechunk.distributed import extract_store, merge_stores - +from packaging import Version from icechunk import IcechunkStore SimpleGraph: TypeAlias = Mapping[tuple[str, int], tuple[Any, ...]] +def _assert_correct_dask_version() -> None: + if Version(dask.__version__) < Version("2024.11.0"): + raise ValueError(f"This requires dask>=2024.11.0 but you have {dask.__version__}. Please upgrade.") def store_dask( store: IcechunkStore, @@ -114,6 +118,8 @@ def stateful_store_reduce( split_every: int | None = None, **kwargs: Any, ) -> IcechunkStore | Delayed: + _assert_correct_dask_version() + split_every = split_every or config.get("split_every", 8) layers: MutableMapping[str, SimpleGraph] = {} diff --git a/icechunk-python/python/icechunk/xarray.py b/icechunk-python/python/icechunk/xarray.py index 2c928baa4..3c3e7e720 100644 --- a/icechunk-python/python/icechunk/xarray.py +++ b/icechunk-python/python/icechunk/xarray.py @@ -71,6 +71,9 @@ class XarrayDatasetWriter: dataset: Dataset = field(repr=False) store: IcechunkStore = field(kw_only=True) + safe_chunks: bool = field(kw_only=True, default=True) + write_empty_chunks: bool = field(kw_only=True, default=True) + _initialized: bool = field(default=False, repr=False) xarray_store: ZarrStore = field(init=False, repr=False) @@ -82,6 +85,33 @@ def __post_init__(self) -> None: f"Please pass in an IcechunkStore. Received {type(self.store)!r} instead." ) + def _open_group(self, *, group, mode: ZarrWriteModes, append_dim: Hashable | None, region, encoding) -> None: + # from xarray.backends.zarr import _choose_default_mode + + # concrete_mode: ZarrWriteModes = _choose_default_mode( + # mode=mode, append_dim=append_dim, region=region + # ) + + self.xarray_store = ZarrStore.open_group( + store=self.store, + group=group, + mode=mode, + zarr_format=3, + append_dim=append_dim, + write_region=region, + safe_chunks=self.safe_chunks, + write_empty=self.write_empty_chunks, + synchronizer=None, + consolidated=False, + consolidate_on_close=False, + zarr_version=None, + ) + + if encoding is None: + encoding = {} + self.xarray_store._validate_encoding(encoding) + + def write_metadata( self, *, @@ -178,41 +208,56 @@ def write_metadata( in with ``region``, use the `XarrayDatasetWriter` directly. """ from xarray.backends.api import _validate_dataset_names, dump_to_store - from xarray.backends.zarr import _choose_default_mode # validate Dataset keys, DataArray names _validate_dataset_names(self.dataset) - concrete_mode: ZarrWriteModes = _choose_default_mode( - mode=mode, append_dim=append_dim, region=region + # This writes the metadata (zarr.json) for all arrays + # This also will resize arrays for any appends + self.writer = LazyArrayWriter() + dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding) # type: ignore[no-untyped-call] + + self._initialized = True + + def for_write(self, *, group, mode="w-"): + self._open_group( + group=group, + mode=mode, + zarr_format=3, + append_dim=None, + write_region=None, ) - self.xarray_store = ZarrStore.open_group( - store=self.store, + def for_region(self, *, group, region) -> None: + self._open_group( group=group, - mode=concrete_mode, + mode="r+", zarr_format=3, - append_dim=append_dim, + append_dim=None, write_region=region, - safe_chunks=safe_chunks, - write_empty=write_empty_chunks, - synchronizer=None, - consolidated=False, - consolidate_on_close=False, - zarr_version=None, ) + # Xarray today raises an error if there are variables whose dimensions + # do not overlap with the region's dimensions + self.dataset = self.xarray_store._validate_and_autodetect_region(self.dataset) - if encoding is None: - encoding = {} - self.xarray_store._validate_encoding(encoding) + def for_append(self, *, group, append_dim) -> None: + self._open_group( + group=group, + mode="r+", + zarr_format=3, + append_dim=append_dim, + write_region=None, + ) + # TODO: raise if dataset has vars do not have the append_dim. - dataset = self.xarray_store._validate_and_autodetect_region(self.dataset) - # This writes the metadata (zarr.json) for all arrays - self.writer = LazyArrayWriter() - dump_to_store(dataset, self.xarray_store, self.writer, encoding=encoding) # type: ignore[no-untyped-call] - - self._initialized = True + def describe_change(self): + """ + This method will describe a list of changes. + 1. Arrays that will be appended to + 1. New arrays to be written. + """ + pass def write_eager(self) -> None: """