Skip to content

Commit

Permalink
More API sketching
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed Nov 11, 2024
1 parent 934c3dc commit 70fcf94
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 26 deletions.
87 changes: 84 additions & 3 deletions docs/docs/icechunk-python/distributed-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

!!! warning

Using Xarray, Dask, and Icechunk requires `icechunk>=FOO`, `dask>=FOO`, and `xarray>=2024.11.0`.
Using Xarray, Dask, and Icechunk requires `icechunk>=FOO`, `dask>=2024.11.0`, and `xarray>=2024.11.0`.


First let's start a distributed Client and create an IcechunkStore.
Expand Down Expand Up @@ -103,7 +103,8 @@ writer = XarrayDatasetWriter(ds, store=icechunk_store)

Write metadata for arrays in one step. This "initializes" the store but has not written an real values yet.
```python
writer.write_metadata(group="new2", mode="w")
writer.for_write(group="new2", mode="w") # default mode="w-"?
writer.write_metadata()
```
Write an in-memory arrays to the store:
```python
Expand All @@ -114,10 +115,90 @@ and any in-memory arrays with the desired values.

Finally execute a write of all lazy arrays.
```python
writer.write_lazy() # eagerly write dask arrays
writer.write_lazy()
```

Finally commit your changes!
```python
icechunk_store.commit("wrote an Xarray dataset!")
```

## Modifying existing stores

Confusions re: appends and region writes:
1. Do existing coordinate variables get overwritten?
2. Do existing attributes get overwritten?
3. Can I write new variables?
4. With appends, can I update only a subset of arrays in the group with `append_dim`?

Proposal:
1. Appends along a dimensions, and region writes are conceptually similar and never allow changing existing array metadata.
2. To add a new variable to the store, or modify existing metadata, use a different entrypoint; `writer.for_modifying`.

API Qs:
1. `write_metadata`: Is it clear that this *creates* new Zarr arrays, and will *resize* any array
that is being appended to?
2. Do people like writing new variables AND appending along a dimension at the same time?

### Overwriting the _whole_ store

```python
writer = XarrayDatasetWriter(ds, store=icechunk_store)
writer.for_create_or_overwrite(
group="new2", # will set mode="w"
)
```

### Overwrite, or add new variables
```python
writer = XarrayDatasetWriter(ds, store=icechunk_store)
writer = writer.for_modify(
group="new2", # will set mode="a"
)
# allows writing new vars, updating attributes for existing arrays
writer.write_metadata()
# will overwrite any pre-existing variables
writer.write_eager()
# will overwrite any pre-existing variables
writer.write_lazy()
```

### Adding new variables only
```python
writer = XarrayDatasetWriter(ds, store=icechunk_store)
writer = writer.for_modify(
group="new2", # will set mode="a"
)
# modifies the dataset to be written, by dropping any pre-existing vars
writer = writer.drop_existing_vars()
writer.write_metadata()
writer.write_eager()
writer.write_lazy()
```


### Appending along a dimension

```python
writer = XarrayDatasetWriter(ds, store=icechunk_store)
writer = writer.for_append(
group="new2", append_dim="time" # will set mode="a"
)
# this will resize the appropriate arrays.
# should it overwrite existing metadata?
writer.write_metadata()
# This will write the updated coordinate value for `append_dim` (if any)
writer.write_eager()
# now effectively execute a region write for the appended piece
writer.write_lazy()
```

### Writing to a region of existing arrays
```python
writer = XarrayDatasetWriter(ds, store=icechunk_store)
# will raise for any variables not overlapping with region.
writer.for_region(region="auto")
writer.write_metadata()
writer.write_eager()
writer.write_lazy()
```
8 changes: 7 additions & 1 deletion icechunk-python/python/icechunk/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
overload,
)

import dask
import dask.array
import zarr
from dask import config
Expand All @@ -21,11 +22,14 @@
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
from icechunk.distributed import extract_store, merge_stores

from packaging import Version
from icechunk import IcechunkStore

SimpleGraph: TypeAlias = Mapping[tuple[str, int], tuple[Any, ...]]

def _assert_correct_dask_version() -> None:
if Version(dask.__version__) < Version("2024.11.0"):
raise ValueError(f"This requires dask>=2024.11.0 but you have {dask.__version__}. Please upgrade.")

def store_dask(
store: IcechunkStore,
Expand Down Expand Up @@ -114,6 +118,8 @@ def stateful_store_reduce(
split_every: int | None = None,
**kwargs: Any,
) -> IcechunkStore | Delayed:
_assert_correct_dask_version()

split_every = split_every or config.get("split_every", 8)

layers: MutableMapping[str, SimpleGraph] = {}
Expand Down
89 changes: 67 additions & 22 deletions icechunk-python/python/icechunk/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class XarrayDatasetWriter:
dataset: Dataset = field(repr=False)
store: IcechunkStore = field(kw_only=True)

safe_chunks: bool = field(kw_only=True, default=True)
write_empty_chunks: bool = field(kw_only=True, default=True)

_initialized: bool = field(default=False, repr=False)

xarray_store: ZarrStore = field(init=False, repr=False)
Expand All @@ -82,6 +85,33 @@ def __post_init__(self) -> None:
f"Please pass in an IcechunkStore. Received {type(self.store)!r} instead."
)

def _open_group(self, *, group, mode: ZarrWriteModes, append_dim: Hashable | None, region, encoding) -> None:
# from xarray.backends.zarr import _choose_default_mode

# concrete_mode: ZarrWriteModes = _choose_default_mode(
# mode=mode, append_dim=append_dim, region=region
# )

self.xarray_store = ZarrStore.open_group(
store=self.store,
group=group,
mode=mode,
zarr_format=3,
append_dim=append_dim,
write_region=region,
safe_chunks=self.safe_chunks,
write_empty=self.write_empty_chunks,
synchronizer=None,
consolidated=False,
consolidate_on_close=False,
zarr_version=None,
)

if encoding is None:
encoding = {}
self.xarray_store._validate_encoding(encoding)


def write_metadata(
self,
*,
Expand Down Expand Up @@ -178,41 +208,56 @@ def write_metadata(
in with ``region``, use the `XarrayDatasetWriter` directly.
"""
from xarray.backends.api import _validate_dataset_names, dump_to_store
from xarray.backends.zarr import _choose_default_mode

# validate Dataset keys, DataArray names
_validate_dataset_names(self.dataset)

concrete_mode: ZarrWriteModes = _choose_default_mode(
mode=mode, append_dim=append_dim, region=region
# This writes the metadata (zarr.json) for all arrays
# This also will resize arrays for any appends
self.writer = LazyArrayWriter()
dump_to_store(self.dataset, self.xarray_store, self.writer, encoding=encoding) # type: ignore[no-untyped-call]

self._initialized = True

def for_write(self, *, group, mode="w-"):
self._open_group(
group=group,
mode=mode,
zarr_format=3,
append_dim=None,
write_region=None,
)

self.xarray_store = ZarrStore.open_group(
store=self.store,
def for_region(self, *, group, region) -> None:
self._open_group(
group=group,
mode=concrete_mode,
mode="r+",
zarr_format=3,
append_dim=append_dim,
append_dim=None,
write_region=region,
safe_chunks=safe_chunks,
write_empty=write_empty_chunks,
synchronizer=None,
consolidated=False,
consolidate_on_close=False,
zarr_version=None,
)
# Xarray today raises an error if there are variables whose dimensions
# do not overlap with the region's dimensions
self.dataset = self.xarray_store._validate_and_autodetect_region(self.dataset)

if encoding is None:
encoding = {}
self.xarray_store._validate_encoding(encoding)
def for_append(self, *, group, append_dim) -> None:
self._open_group(
group=group,
mode="r+",
zarr_format=3,
append_dim=append_dim,
write_region=None,
)
# TODO: raise if dataset has vars do not have the append_dim.

dataset = self.xarray_store._validate_and_autodetect_region(self.dataset)

# This writes the metadata (zarr.json) for all arrays
self.writer = LazyArrayWriter()
dump_to_store(dataset, self.xarray_store, self.writer, encoding=encoding) # type: ignore[no-untyped-call]

self._initialized = True
def describe_change(self):
"""
This method will describe a list of changes.
1. Arrays that will be appended to
1. New arrays to be written.
"""
pass

def write_eager(self) -> None:
"""
Expand Down

0 comments on commit 70fcf94

Please sign in to comment.