Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make dask names change when chunking Variables by different amounts. #3584

Merged
merged 9 commits into from
Jan 10, 2020
4 changes: 4 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ New Features

Bug fixes
~~~~~~~~~

- Fix :py:meth:`xarray.combine_by_coords` to allow for combining incomplete
hypercubes of Datasets (:issue:`3648`). By `Ian Bolliger
<https://github.com/bolliger32>`_.
Expand Down Expand Up @@ -83,6 +84,9 @@ Documentation

Internal Changes
~~~~~~~~~~~~~~~~
- Make sure dask names change when rechunking by different chunk sizes. Conversely, make sure they
stay the same when rechunking by the same chunk size. (:issue:`3350`)
By `Deepak Cherian <https://github.com/dcherian>`_.
- 2x to 5x speed boost (on small arrays) for :py:meth:`Dataset.isel`,
:py:meth:`DataArray.isel`, and :py:meth:`DataArray.__getitem__` when indexing by int,
slice, list of int, scalar ndarray, or 1-dimensional ndarray.
Expand Down
5 changes: 4 additions & 1 deletion xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,10 @@ def maybe_chunk(name, var, chunks):
if not chunks:
chunks = None
if var.ndim > 0:
token2 = tokenize(name, token if token else var._data)
# when rechunking by different amounts, make sure dask names change
# by provinding chunks as an input to tokenize.
# subtle bugs result otherwise. see GH3350
token2 = tokenize(name, token if token else var._data, chunks)
name2 = f"{name_prefix}{name}-{token2}"
return var.chunk(chunks, name=name2, lock=lock)
else:
Expand Down
5 changes: 5 additions & 0 deletions xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,11 @@ def __eq__(self, other) -> bool:
def __hash__(self) -> int:
return hash((ReprObject, self._value))
dcherian marked this conversation as resolved.
Show resolved Hide resolved

def __dask_tokenize__(self):
from dask.base import normalize_token

return normalize_token((type(self), self._value))


@contextlib.contextmanager
def close_on_error(f):
Expand Down
18 changes: 9 additions & 9 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ def func(obj):
actual = xr.map_blocks(func, obj)
expected = func(obj)
assert_chunks_equal(expected.chunk(), actual)
xr.testing.assert_identical(actual.compute(), expected.compute())
assert_identical(actual, expected)


@pytest.mark.parametrize("obj", [make_da(), make_ds()])
Expand All @@ -1092,7 +1092,7 @@ def test_map_blocks_convert_args_to_list(obj):
with raise_if_dask_computes():
actual = xr.map_blocks(operator.add, obj, [10])
assert_chunks_equal(expected.chunk(), actual)
xr.testing.assert_identical(actual.compute(), expected.compute())
assert_identical(actual, expected)


@pytest.mark.parametrize("obj", [make_da(), make_ds()])
Expand All @@ -1107,7 +1107,7 @@ def add_attrs(obj):
with raise_if_dask_computes():
actual = xr.map_blocks(add_attrs, obj)

xr.testing.assert_identical(actual.compute(), expected.compute())
assert_identical(actual, expected)


def test_map_blocks_change_name(map_da):
Expand All @@ -1120,7 +1120,7 @@ def change_name(obj):
with raise_if_dask_computes():
actual = xr.map_blocks(change_name, map_da)

xr.testing.assert_identical(actual.compute(), expected.compute())
assert_identical(actual, expected)


@pytest.mark.parametrize("obj", [make_da(), make_ds()])
Expand All @@ -1129,15 +1129,15 @@ def test_map_blocks_kwargs(obj):
with raise_if_dask_computes():
actual = xr.map_blocks(xr.full_like, obj, kwargs=dict(fill_value=np.nan))
assert_chunks_equal(expected.chunk(), actual)
xr.testing.assert_identical(actual.compute(), expected.compute())
assert_identical(actual, expected)


def test_map_blocks_to_array(map_ds):
with raise_if_dask_computes():
actual = xr.map_blocks(lambda x: x.to_array(), map_ds)

# to_array does not preserve name, so cannot use assert_identical
assert_equal(actual.compute(), map_ds.to_array().compute())
assert_equal(actual, map_ds.to_array())


@pytest.mark.parametrize(
Expand All @@ -1156,7 +1156,7 @@ def test_map_blocks_da_transformations(func, map_da):
with raise_if_dask_computes():
actual = xr.map_blocks(func, map_da)

assert_identical(actual.compute(), func(map_da).compute())
assert_identical(actual, func(map_da))


@pytest.mark.parametrize(
Expand All @@ -1175,7 +1175,7 @@ def test_map_blocks_ds_transformations(func, map_ds):
with raise_if_dask_computes():
actual = xr.map_blocks(func, map_ds)

assert_identical(actual.compute(), func(map_ds).compute())
assert_identical(actual, func(map_ds))


@pytest.mark.parametrize("obj", [make_da(), make_ds()])
Expand All @@ -1188,7 +1188,7 @@ def func(obj):
expected = xr.map_blocks(func, obj)
actual = obj.map_blocks(func)

assert_identical(expected.compute(), actual.compute())
assert_identical(expected, actual)


def test_map_blocks_hlg_layers():
Expand Down
7 changes: 7 additions & 0 deletions xarray/tests/test_dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,19 @@ def test_chunk(self):

blocked = unblocked.chunk()
assert blocked.chunks == ((3,), (4,))
first_dask_name = blocked.data.name

blocked = unblocked.chunk(chunks=((2, 1), (2, 2)))
assert blocked.chunks == ((2, 1), (2, 2))
assert blocked.data.name != first_dask_name

blocked = unblocked.chunk(chunks=(3, 3))
assert blocked.chunks == ((3,), (3, 1))
assert blocked.data.name != first_dask_name

# name doesn't change when rechunking by same amount
# this fails if ReprObject doesn't have __dask_tokenize__ defined
assert unblocked.chunk(2).data.name == unblocked.chunk(2).data.name

assert blocked.load().chunks is None

Expand Down
16 changes: 16 additions & 0 deletions xarray/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,19 +936,35 @@ def test_chunk(self):
expected_chunks = {"dim1": (8,), "dim2": (9,), "dim3": (10,)}
assert reblocked.chunks == expected_chunks

def get_dask_names(ds):
return {k: v.data.name for k, v in ds.items()}

orig_dask_names = get_dask_names(reblocked)

reblocked = data.chunk({"time": 5, "dim1": 5, "dim2": 5, "dim3": 5})
# time is not a dim in any of the data_vars, so it
# doesn't get chunked
expected_chunks = {"dim1": (5, 3), "dim2": (5, 4), "dim3": (5, 5)}
assert reblocked.chunks == expected_chunks

# make sure dask names change when rechunking by different amounts
# regression test for GH3350
new_dask_names = get_dask_names(reblocked)
for k, v in new_dask_names.items():
assert v != orig_dask_names[k]

reblocked = data.chunk(expected_chunks)
assert reblocked.chunks == expected_chunks

# reblock on already blocked data
orig_dask_names = get_dask_names(reblocked)
reblocked = reblocked.chunk(expected_chunks)
new_dask_names = get_dask_names(reblocked)
assert reblocked.chunks == expected_chunks
assert_identical(reblocked, data)
# recuhnking with same chunk sizes should not change names
for k, v in new_dask_names.items():
assert v == orig_dask_names[k]

with raises_regex(ValueError, "some chunks"):
data.chunk({"foo": 10})
Expand Down