Skip to content

Commit

Permalink
New inline_array kwarg for open_dataset (#6566)
Browse files Browse the repository at this point in the history
* added inline_array kwarg

* remove cheeky print statements

* Remove another rogue print statement

* bump dask dependency

* update multiple dependencies based on min-deps-check.py

* update environment to match #6559

* Update h5py in ci/requirements/min-all-deps.yml

* Update ci/requirements/min-all-deps.yml

* remove pynio from test env

* Update ci/requirements/min-all-deps.yml

* promote inline_array kwarg to be top-level kwarg

* whatsnew

* add test

* Remove repeated docstring entry

Co-authored-by: Deepak Cherian <[email protected]>

* Remove repeated docstring entry

Co-authored-by: Deepak Cherian <[email protected]>

* hyperlink to dask functions

Co-authored-by: Deepak Cherian <[email protected]>
  • Loading branch information
TomNicholas and dcherian authored May 11, 2022
1 parent cad4474 commit 0512da1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 4 deletions.
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ New Features
- Allow passing chunks in ``**kwargs`` form to :py:meth:`Dataset.chunk`, :py:meth:`DataArray.chunk`, and
:py:meth:`Variable.chunk`. (:pull:`6471`)
By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Expose `inline_array` kwarg from `dask.array.from_array` in :py:func:`open_dataset`, :py:meth:`Dataset.chunk`,
:py:meth:`DataArray.chunk`, and :py:meth:`Variable.chunk`. (:pull:`6471`)
By `Tom Nicholas <https://github.com/TomNicholas>`_.
- :py:meth:`xr.polyval` now supports :py:class:`Dataset` and :py:class:`DataArray` args of any shape,
is faster and requires less memory. (:pull:`6548`)
By `Michael Niklas <https://github.com/headtr1ck>`_.
Expand Down
20 changes: 20 additions & 0 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def _chunk_ds(
engine,
chunks,
overwrite_encoded_chunks,
inline_array,
**extra_tokens,
):
from dask.base import tokenize
Expand All @@ -292,6 +293,7 @@ def _chunk_ds(
overwrite_encoded_chunks=overwrite_encoded_chunks,
name_prefix=name_prefix,
token=token,
inline_array=inline_array,
)
return backend_ds._replace(variables)

Expand All @@ -303,6 +305,7 @@ def _dataset_from_backend_dataset(
chunks,
cache,
overwrite_encoded_chunks,
inline_array,
**extra_tokens,
):
if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}:
Expand All @@ -320,6 +323,7 @@ def _dataset_from_backend_dataset(
engine,
chunks,
overwrite_encoded_chunks,
inline_array,
**extra_tokens,
)

Expand All @@ -346,6 +350,7 @@ def open_dataset(
concat_characters=None,
decode_coords=None,
drop_variables=None,
inline_array=False,
backend_kwargs=None,
**kwargs,
):
Expand Down Expand Up @@ -430,6 +435,12 @@ def open_dataset(
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
inline_array: bool, optional
How to include the array in the dask task graph.
By default(``inline_array=False``) the array is included in a task by
itself, and each chunk refers to that task by its key. With
``inline_array=True``, Dask will instead inline the array directly
in the values of the task graph. See :py:func:`dask.array.from_array`.
backend_kwargs: dict
Additional keyword arguments passed on to the engine open function,
equivalent to `**kwargs`.
Expand Down Expand Up @@ -505,6 +516,7 @@ def open_dataset(
chunks,
cache,
overwrite_encoded_chunks,
inline_array,
drop_variables=drop_variables,
**decoders,
**kwargs,
Expand All @@ -526,6 +538,7 @@ def open_dataarray(
concat_characters=None,
decode_coords=None,
drop_variables=None,
inline_array=False,
backend_kwargs=None,
**kwargs,
):
Expand Down Expand Up @@ -613,6 +626,12 @@ def open_dataarray(
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
inline_array: bool, optional
How to include the array in the dask task graph.
By default(``inline_array=False``) the array is included in a task by
itself, and each chunk refers to that task by its key. With
``inline_array=True``, Dask will instead inline the array directly
in the values of the task graph. See :py:func:`dask.array.from_array`.
backend_kwargs: dict
Additional keyword arguments passed on to the engine open function,
equivalent to `**kwargs`.
Expand Down Expand Up @@ -660,6 +679,7 @@ def open_dataarray(
chunks=chunks,
cache=cache,
drop_variables=drop_variables,
inline_array=inline_array,
backend_kwargs=backend_kwargs,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
Expand Down
17 changes: 16 additions & 1 deletion xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ def chunk(
name_prefix: str = "xarray-",
token: str = None,
lock: bool = False,
inline_array: bool = False,
**chunks_kwargs: Any,
) -> DataArray:
"""Coerce this array's data into a dask arrays with the given chunks.
Expand All @@ -1138,13 +1139,23 @@ def chunk(
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
inline_array: optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
**chunks_kwargs : {dim: chunks, ...}, optional
The keyword arguments form of ``chunks``.
One of chunks or chunks_kwargs must be provided.
Returns
-------
chunked : xarray.DataArray
See Also
--------
DataArray.chunks
DataArray.chunksizes
xarray.unify_chunks
dask.array.from_array
"""
if chunks is None:
warnings.warn(
Expand All @@ -1163,7 +1174,11 @@ def chunk(
chunks = either_dict_or_kwargs(chunks, chunks_kwargs, "chunk")

ds = self._to_temp_dataset().chunk(
chunks, name_prefix=name_prefix, token=token, lock=lock
chunks,
name_prefix=name_prefix,
token=token,
lock=lock,
inline_array=inline_array,
)
return self._from_temp_dataset(ds)

Expand Down
8 changes: 7 additions & 1 deletion xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def _maybe_chunk(
lock=None,
name_prefix="xarray-",
overwrite_encoded_chunks=False,
inline_array=False,
):
from dask.base import tokenize

Expand All @@ -251,7 +252,7 @@ def _maybe_chunk(
# subtle bugs result otherwise. see GH3350
token2 = tokenize(name, token if token else var._data, chunks)
name2 = f"{name_prefix}{name}-{token2}"
var = var.chunk(chunks, name=name2, lock=lock)
var = var.chunk(chunks, name=name2, lock=lock, inline_array=inline_array)

if overwrite_encoded_chunks and var.chunks is not None:
var.encoding["chunks"] = tuple(x[0] for x in var.chunks)
Expand Down Expand Up @@ -1995,6 +1996,7 @@ def chunk(
name_prefix: str = "xarray-",
token: str = None,
lock: bool = False,
inline_array: bool = False,
**chunks_kwargs: Any,
) -> Dataset:
"""Coerce all arrays in this dataset into dask arrays with the given
Expand All @@ -2019,6 +2021,9 @@ def chunk(
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
inline_array: optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
**chunks_kwargs : {dim: chunks, ...}, optional
The keyword arguments form of ``chunks``.
One of chunks or chunks_kwargs must be provided
Expand All @@ -2032,6 +2037,7 @@ def chunk(
Dataset.chunks
Dataset.chunksizes
xarray.unify_chunks
dask.array.from_array
"""
if chunks is None and chunks_kwargs is None:
warnings.warn(
Expand Down
17 changes: 15 additions & 2 deletions xarray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ def chunk(
) = {},
name: str = None,
lock: bool = False,
inline_array: bool = False,
**chunks_kwargs: Any,
) -> Variable:
"""Coerce this array's data into a dask array with the given chunks.
Expand All @@ -1046,13 +1047,23 @@ def chunk(
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
inline_array: optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.
**chunks_kwargs : {dim: chunks, ...}, optional
The keyword arguments form of ``chunks``.
One of chunks or chunks_kwargs must be provided.
Returns
-------
chunked : xarray.Variable
See Also
--------
Variable.chunks
Variable.chunksizes
xarray.unify_chunks
dask.array.from_array
"""
import dask.array as da

Expand Down Expand Up @@ -1098,7 +1109,9 @@ def chunk(
if utils.is_dict_like(chunks):
chunks = tuple(chunks.get(n, s) for n, s in enumerate(self.shape))

data = da.from_array(data, chunks, name=name, lock=lock, **kwargs)
data = da.from_array(
data, chunks, name=name, lock=lock, inline_array=inline_array, **kwargs
)

return self._replace(data=data)

Expand Down Expand Up @@ -2710,7 +2723,7 @@ def values(self, values):
f"Please use DataArray.assign_coords, Dataset.assign_coords or Dataset.assign as appropriate."
)

def chunk(self, chunks={}, name=None, lock=False):
def chunk(self, chunks={}, name=None, lock=False, inline_array=False):
# Dummy - do not chunk. This method is invoked e.g. by Dataset.chunk()
return self.copy(deep=False)

Expand Down
21 changes: 21 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3840,6 +3840,27 @@ def test_load_dataarray(self):
# load_dataarray
ds.to_netcdf(tmp)

@pytest.mark.skipif(
ON_WINDOWS,
reason="counting number of tasks in graph fails on windows for some reason",
)
def test_inline_array(self):
with create_tmp_file() as tmp:
original = Dataset({"foo": ("x", np.random.randn(10))})
original.to_netcdf(tmp)
chunks = {"time": 10}

def num_graph_nodes(obj):
return len(obj.__dask_graph__())

not_inlined = open_dataset(tmp, inline_array=False, chunks=chunks)
inlined = open_dataset(tmp, inline_array=True, chunks=chunks)
assert num_graph_nodes(inlined) < num_graph_nodes(not_inlined)

not_inlined = open_dataarray(tmp, inline_array=False, chunks=chunks)
inlined = open_dataarray(tmp, inline_array=True, chunks=chunks)
assert num_graph_nodes(inlined) < num_graph_nodes(not_inlined)


@requires_scipy_or_netCDF4
@requires_pydap
Expand Down

0 comments on commit 0512da1

Please sign in to comment.