Skip to content

Commit

Permalink
Zarr: optimize appending
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian committed May 7, 2024
1 parent dcf2ac4 commit b21f7f4
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 65 deletions.
56 changes: 2 additions & 54 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,42 +1521,6 @@ def save_mfdataset(
)


def _validate_datatypes_for_zarr_append(zstore, dataset):
"""If variable exists in the store, confirm dtype of the data to append is compatible with
existing dtype.
"""

existing_vars = zstore.get_variables()

def check_dtype(vname, var):
if (
vname not in existing_vars
or np.issubdtype(var.dtype, np.number)
or np.issubdtype(var.dtype, np.datetime64)
or np.issubdtype(var.dtype, np.bool_)
or var.dtype == object
):
# We can skip dtype equality checks under two conditions: (1) if the var to append is
# new to the dataset, because in this case there is no existing var to compare it to;
# or (2) if var to append's dtype is known to be easy-to-append, because in this case
# we can be confident appending won't cause problems. Examples of dtypes which are not
# easy-to-append include length-specified strings of type `|S*` or `<U*` (where * is a
# positive integer character length). For these dtypes, appending dissimilar lengths
# can result in truncation of appended data. Therefore, variables which already exist
# in the dataset, and with dtypes which are not known to be easy-to-append, necessitate
# exact dtype equality, as checked below.
pass
elif not var.dtype == existing_vars[vname].dtype:
raise ValueError(
f"Mismatched dtypes for variable {vname} between Zarr store on disk "
f"and dataset to append. Store has dtype {existing_vars[vname].dtype} but "
f"dataset to append has dtype {var.dtype}."
)

for vname, var in dataset.data_vars.items():
check_dtype(vname, var)


# compute=True returns ZarrStore
@overload
def to_zarr(
Expand Down Expand Up @@ -1712,7 +1676,7 @@ def to_zarr(

if region is not None:
zstore._validate_and_autodetect_region(dataset)
# can't modify indexed with region writes
# can't modify indexes with region writes
dataset = dataset.drop_vars(dataset.indexes)
if append_dim is not None and append_dim in region:
raise ValueError(
Expand All @@ -1721,28 +1685,12 @@ def to_zarr(
)

if mode in ["a", "a-", "r+"]:
_validate_datatypes_for_zarr_append(zstore, dataset)
if append_dim is not None:
existing_dims = zstore.get_dimensions()
if append_dim not in existing_dims:
raise ValueError(
f"append_dim={append_dim!r} does not match any existing "
f"dataset dimensions {existing_dims}"
)
existing_var_names = set(zstore.zarr_group.array_keys())
for var_name in existing_var_names:
if var_name in encoding.keys():
if var_name in encoding:
raise ValueError(
f"variable {var_name!r} already exists, but encoding was provided"
)
if mode == "r+":
new_names = [k for k in dataset.variables if k not in existing_var_names]
if new_names:
raise ValueError(
f"dataset contains non-pre-existing variables {new_names}, "
"which is not allowed in ``xarray.Dataset.to_zarr()`` with "
"mode='r+'. To allow writing new variables, set mode='a'."
)

writer = ArrayWriter()
# TODO: figure out how to properly handle unlimited_dims
Expand Down
82 changes: 71 additions & 11 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,34 @@ def encode_zarr_variable(var, needs_copy=True, name=None):
return var


def _validate_datatypes_for_zarr_append(vname, existing_var, new_var):
"""If variable exists in the store, confirm dtype of the data to append is compatible with
existing dtype.
"""
if (
np.issubdtype(new_var.dtype, np.number)
or np.issubdtype(new_var.dtype, np.datetime64)
or np.issubdtype(new_var.dtype, np.bool_)
or new_var.dtype == object
):
# We can skip dtype equality checks under two conditions: (1) if the var to append is
# new to the dataset, because in this case there is no existing var to compare it to;
# or (2) if var to append's dtype is known to be easy-to-append, because in this case
# we can be confident appending won't cause problems. Examples of dtypes which are not
# easy-to-append include length-specified strings of type `|S*` or `<U*` (where * is a
# positive integer character length). For these dtypes, appending dissimilar lengths
# can result in truncation of appended data. Therefore, variables which already exist
# in the dataset, and with dtypes which are not known to be easy-to-append, necessitate
# exact dtype equality, as checked below.
pass
elif not new_var.dtype == existing_var.dtype:
raise ValueError(
f"Mismatched dtypes for variable {vname} between Zarr store on disk "
f"and dataset to append. Store has dtype {existing_var.dtype} but "
f"dataset to append has dtype {new_var.dtype}."
)


def _validate_and_transpose_existing_dims(
var_name, new_var, existing_var, region, append_dim
):
Expand Down Expand Up @@ -612,26 +640,58 @@ def store(
import zarr

existing_keys = tuple(self.zarr_group.array_keys())

if self._mode == "r+":
new_names = [k for k in variables if k not in existing_keys]
if new_names:
raise ValueError(
f"dataset contains non-pre-existing variables {new_names}, "
"which is not allowed in ``xarray.Dataset.to_zarr()`` with "
"``mode='r+'``. To allow writing new variables, set ``mode='a'``."
)

if self._append_dim is not None and self._append_dim not in existing_keys:
# For dimensions without coordinate values, we must parse
# the _ARRAY_DIMENSIONS attribute on *all* arrays to check if it
# is a valid existing dimension name.
# TODO: This `get_dimensions` method also does shape checking
# which isn't strictly necessary for our check.
existing_dims = self.get_dimensions()
if self._append_dim not in existing_dims:
raise ValueError(
f"append_dim={self._append_dim!r} does not match any existing "
f"dataset dimensions {existing_dims}"
)

existing_variable_names = {
vn for vn in variables if _encode_variable_name(vn) in existing_keys
}
new_variables = set(variables) - existing_variable_names
variables_without_encoding = {vn: variables[vn] for vn in new_variables}
new_variable_names = set(variables) - existing_variable_names
variables_encoded, attributes = self.encode(
variables_without_encoding, attributes
{vn: variables[vn] for vn in new_variable_names}, attributes
)

if existing_variable_names:
# Decode variables directly, without going via xarray.Dataset to
# avoid needing to load index variables into memory.
# TODO: consider making loading indexes lazy again?
# We make sure that values to be appended are encoded *exactly*
# as the current values in the store.
# To do so, we decode variables directly to access the proper encoding,
# without going via xarray.Dataset to avoid needing to load
# index variables into memory.
existing_vars, _, _ = conventions.decode_cf_variables(
{k: self.open_store_variable(name=k) for k in existing_variable_names},
self.get_attrs(),
variables={
k: self.open_store_variable(name=k) for k in existing_variable_names
},
# attributes = {} since we don't care about parsing the global
# "coordinates" attribute
attributes={},
)
# Modified variables must use the same encoding as the store.
vars_with_encoding = {}
for vn in existing_variable_names:
if self._mode in ["a", "a-", "r+"]:
_validate_datatypes_for_zarr_append(
vn, existing_vars[vn], variables[vn]
)
vars_with_encoding[vn] = variables[vn].copy(deep=False)
vars_with_encoding[vn].encoding = existing_vars[vn].encoding
vars_with_encoding, _ = self.encode(vars_with_encoding, {})
Expand Down Expand Up @@ -696,7 +756,7 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No

for vn, v in variables.items():
name = _encode_variable_name(vn)
check = vn in check_encoding_set

attrs = v.attrs.copy()
dims = v.dims
dtype = v.dtype
Expand All @@ -712,7 +772,7 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
# https://github.com/pydata/xarray/issues/8371 for details.
encoding = extract_zarr_variable_encoding(
v,
raise_on_invalid=check,
raise_on_invalid=vn in check_encoding_set,
name=vn,
safe_chunks=self._safe_chunks,
)
Expand Down Expand Up @@ -815,7 +875,7 @@ def _auto_detect_regions(self, ds, region):
assert variable.dims == (dim,)
index = pd.Index(variable.data)
idxs = index.get_indexer(ds[dim].data)
if any(idxs == -1):
if (idxs == -1).any():
raise KeyError(
f"Not all values of coordinate '{dim}' in the new array were"
" found in the original store. Writing to a zarr region slice"
Expand Down

0 comments on commit b21f7f4

Please sign in to comment.