diff --git a/xarray/core/coordinates.py b/xarray/core/coordinates.py index cdf1d354be6..c59c5deba16 100644 --- a/xarray/core/coordinates.py +++ b/xarray/core/coordinates.py @@ -213,7 +213,7 @@ class Coordinates(AbstractCoordinates): :py:class:`~xarray.Coordinates` object is passed, its indexes will be added to the new created object. indexes: dict-like, optional - Mapping of where keys are coordinate names and values are + Mapping where keys are coordinate names and values are :py:class:`~xarray.indexes.Index` objects. If None (default), pandas indexes will be created for each dimension coordinate. Passing an empty dictionary will skip this default behavior. diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 0335ad3bdda..0f245ff464b 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -80,7 +80,7 @@ try: from dask.dataframe import DataFrame as DaskDataFrame except ImportError: - DaskDataFrame = None # type: ignore + DaskDataFrame = None try: from dask.delayed import Delayed except ImportError: diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 9ec39e74ad1..a6fc0e2ca18 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -171,7 +171,7 @@ try: from dask.dataframe import DataFrame as DaskDataFrame except ImportError: - DaskDataFrame = None # type: ignore + DaskDataFrame = None # list of attributes of pd.DatetimeIndex that are ndarrays of time info diff --git a/xarray/core/parallel.py b/xarray/core/parallel.py index f971556b3f7..ef505b55345 100644 --- a/xarray/core/parallel.py +++ b/xarray/core/parallel.py @@ -4,19 +4,29 @@ import itertools import operator from collections.abc import Hashable, Iterable, Mapping, Sequence -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any, Callable, Literal, TypedDict import numpy as np from xarray.core.alignment import align +from xarray.core.coordinates import Coordinates from xarray.core.dataarray import DataArray from xarray.core.dataset import Dataset +from xarray.core.indexes import Index +from xarray.core.merge import merge from xarray.core.pycompat import is_dask_collection if TYPE_CHECKING: from xarray.core.types import T_Xarray +class ExpectedDict(TypedDict): + shapes: dict[Hashable, int] + coords: set[Hashable] + data_vars: set[Hashable] + indexes: dict[Hashable, Index] + + def unzip(iterable): return zip(*iterable) @@ -31,7 +41,9 @@ def assert_chunks_compatible(a: Dataset, b: Dataset): def check_result_variables( - result: DataArray | Dataset, expected: Mapping[str, Any], kind: str + result: DataArray | Dataset, + expected: ExpectedDict, + kind: Literal["coords", "data_vars"], ): if kind == "coords": nice_str = "coordinate" @@ -254,7 +266,7 @@ def _wrapper( args: list, kwargs: dict, arg_is_array: Iterable[bool], - expected: dict, + expected: ExpectedDict, ): """ Wrapper function that receives datasets in args; converts to dataarrays when necessary; @@ -345,33 +357,45 @@ def _wrapper( for arg in aligned ) + merged_coordinates = merge([arg.coords for arg in aligned]).coords + _, npargs = unzip( sorted(list(zip(xarray_indices, xarray_objs)) + others, key=lambda x: x[0]) ) # check that chunk sizes are compatible input_chunks = dict(npargs[0].chunks) - input_indexes = dict(npargs[0]._indexes) for arg in xarray_objs[1:]: assert_chunks_compatible(npargs[0], arg) input_chunks.update(arg.chunks) - input_indexes.update(arg._indexes) + coordinates: Coordinates if template is None: # infer template by providing zero-shaped arrays template = infer_template(func, aligned[0], *args, **kwargs) - template_indexes = set(template._indexes) - preserved_indexes = template_indexes & set(input_indexes) - new_indexes = template_indexes - set(input_indexes) - indexes = {dim: input_indexes[dim] for dim in preserved_indexes} - indexes.update({k: template._indexes[k] for k in new_indexes}) + template_coords = set(template.coords) + preserved_coord_vars = template_coords & set(merged_coordinates) + new_coord_vars = template_coords - set(merged_coordinates) + + preserved_coords = merged_coordinates.to_dataset()[preserved_coord_vars] + # preserved_coords contains all coordinates bariables that share a dimension + # with any index variable in preserved_indexes + # Drop any unneeded vars in a second pass, this is required for e.g. + # if the mapped function were to drop a non-dimension coordinate variable. + preserved_coords = preserved_coords.drop_vars( + tuple(k for k in preserved_coords.variables if k not in template_coords) + ) + + coordinates = merge( + (preserved_coords, template.coords.to_dataset()[new_coord_vars]) + ).coords output_chunks: Mapping[Hashable, tuple[int, ...]] = { dim: input_chunks[dim] for dim in template.dims if dim in input_chunks } else: # template xarray object has been provided with proper sizes and chunk shapes - indexes = dict(template._indexes) + coordinates = template.coords output_chunks = template.chunksizes if not output_chunks: raise ValueError( @@ -473,6 +497,9 @@ def subset_dataset_to_block( return (Dataset, (dict, data_vars), (dict, coords), dataset.attrs) + # variable names that depend on the computation. Currently, indexes + # cannot be modified in the mapped function, so we exclude thos + computed_variables = set(template.variables) - set(coordinates.xindexes) # iterate over all possible chunk combinations for chunk_tuple in itertools.product(*ichunk.values()): # mapping from dimension name to chunk index @@ -485,19 +512,23 @@ def subset_dataset_to_block( for isxr, arg in zip(is_xarray, npargs) ] - # expected["shapes", "coords", "data_vars", "indexes"] are used to # raise nice error messages in _wrapper - expected = {} - # input chunk 0 along a dimension maps to output chunk 0 along the same dimension - # even if length of dimension is changed by the applied function - expected["shapes"] = { - k: output_chunks[k][v] for k, v in chunk_index.items() if k in output_chunks - } - expected["data_vars"] = set(template.data_vars.keys()) # type: ignore[assignment] - expected["coords"] = set(template.coords.keys()) # type: ignore[assignment] - expected["indexes"] = { - dim: indexes[dim][_get_chunk_slicer(dim, chunk_index, output_chunk_bounds)] - for dim in indexes + expected: ExpectedDict = { + # input chunk 0 along a dimension maps to output chunk 0 along the same dimension + # even if length of dimension is changed by the applied function + "shapes": { + k: output_chunks[k][v] + for k, v in chunk_index.items() + if k in output_chunks + }, + "data_vars": set(template.data_vars.keys()), + "coords": set(template.coords.keys()), + "indexes": { + dim: coordinates.xindexes[dim][ + _get_chunk_slicer(dim, chunk_index, output_chunk_bounds) + ] + for dim in coordinates.xindexes + }, } from_wrapper = (gname,) + chunk_tuple @@ -505,9 +536,8 @@ def subset_dataset_to_block( # mapping from variable name to dask graph key var_key_map: dict[Hashable, str] = {} - for name, variable in template.variables.items(): - if name in indexes: - continue + for name in computed_variables: + variable = template.variables[name] gname_l = f"{name}-{gname}" var_key_map[name] = gname_l @@ -543,12 +573,7 @@ def subset_dataset_to_block( }, ) - # TODO: benbovy - flexible indexes: make it work with custom indexes - # this will need to pass both indexes and coords to the Dataset constructor - result = Dataset( - coords={k: idx.to_pandas_index() for k, idx in indexes.items()}, - attrs=template.attrs, - ) + result = Dataset(coords=coordinates, attrs=template.attrs) for index in result._indexes: result[index].attrs = template[index].attrs diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index c2a77c97d85..137d6020829 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -1367,6 +1367,25 @@ def test_map_blocks_da_ds_with_template(obj): assert_identical(actual, template) +def test_map_blocks_roundtrip_string_index(): + ds = xr.Dataset( + {"data": (["label"], [1, 2, 3])}, coords={"label": ["foo", "bar", "baz"]} + ).chunk(label=1) + assert ds.label.dtype == np.dtype("