Skip to content

Commit

Permalink
Use numbagg for rolling methods (#8493)
Browse files Browse the repository at this point in the history
* Use numbagg for `rolling` methods

A couple of tests are failing for the multi-dimensional case, which I'll fix before merge.

* wip

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* whatsnew

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
max-sixty and pre-commit-ci[bot] authored Dec 5, 2023
1 parent 704de55 commit 1f94829
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 27 deletions.
9 changes: 8 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ v2023.11.1 (unreleased)
New Features
~~~~~~~~~~~~

- :py:meth:`rolling` uses numbagg <https://github.com/numbagg/numbagg>`_ for
most of its computations by default. Numbagg is up to 5x faster than bottleneck
where parallelization is possible. Where parallelization isn't possible — for
example a 1D array — it's about the same speed as bottleneck, and 2-5x faster
than pandas' default functions. (:pull:`8493`). numbagg is an optional
dependency, so requires installing separately.
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Use a concise format when plotting datetime arrays. (:pull:`8449`).
By `Jimmy Westling <https://github.com/illviljan>`_.
- Avoid overwriting unchanged existing coordinate variables when appending by setting ``mode='a-'``.
Expand Down Expand Up @@ -90,7 +97,7 @@ Documentation
Internal Changes
~~~~~~~~~~~~~~~~

- :py:meth:`DataArray.bfill` & :py:meth:`DataArray.ffill` now use numbagg by
- :py:meth:`DataArray.bfill` & :py:meth:`DataArray.ffill` now use numbagg <https://github.com/numbagg/numbagg>`_ by
default, which is up to 5x faster where parallelization is possible. (:pull:`8339`)
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Update mypy version to 1.7 (:issue:`8448`, :pull:`8501`).
Expand Down
112 changes: 99 additions & 13 deletions xarray/core/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar

import numpy as np
from packaging.version import Version

from xarray.core import dtypes, duck_array_ops, utils
from xarray.core import dtypes, duck_array_ops, pycompat, utils
from xarray.core.arithmetic import CoarsenArithmetic
from xarray.core.options import OPTIONS, _get_keep_attrs
from xarray.core.pycompat import is_duck_dask_array
from xarray.core.types import CoarsenBoundaryOptions, SideOptions, T_Xarray
from xarray.core.utils import either_dict_or_kwargs
from xarray.core.utils import either_dict_or_kwargs, module_available

try:
import bottleneck
Expand Down Expand Up @@ -145,22 +146,35 @@ def _reduce_method( # type: ignore[misc]
name: str, fillna: Any, rolling_agg_func: Callable | None = None
) -> Callable[..., T_Xarray]:
"""Constructs reduction methods built on a numpy reduction function (e.g. sum),
a bottleneck reduction function (e.g. move_sum), or a Rolling reduction (_mean).
a numbagg reduction function (e.g. move_sum), a bottleneck reduction function
(e.g. move_sum), or a Rolling reduction (_mean).
The logic here for which function to run is quite diffuse, across this method &
_array_reduce. Arguably we could refactor this. But one constraint is that we
need context of xarray options, of the functions each library offers, of
the array (e.g. dtype).
"""
if rolling_agg_func:
array_agg_func = None
else:
array_agg_func = getattr(duck_array_ops, name)

bottleneck_move_func = getattr(bottleneck, "move_" + name, None)
if module_available("numbagg"):
import numbagg

numbagg_move_func = getattr(numbagg, "move_" + name, None)
else:
numbagg_move_func = None

def method(self, keep_attrs=None, **kwargs):
keep_attrs = self._get_keep_attrs(keep_attrs)

return self._numpy_or_bottleneck_reduce(
array_agg_func,
bottleneck_move_func,
rolling_agg_func,
return self._array_reduce(
array_agg_func=array_agg_func,
bottleneck_move_func=bottleneck_move_func,
numbagg_move_func=numbagg_move_func,
rolling_agg_func=rolling_agg_func,
keep_attrs=keep_attrs,
fillna=fillna,
**kwargs,
Expand Down Expand Up @@ -510,9 +524,47 @@ def _counts(self, keep_attrs: bool | None) -> DataArray:
)
return counts

def _bottleneck_reduce(self, func, keep_attrs, **kwargs):
from xarray.core.dataarray import DataArray
def _numbagg_reduce(self, func, keep_attrs, **kwargs):
# Some of this is copied from `_bottleneck_reduce`, we could reduce this as part
# of a wider refactor.

axis = self.obj.get_axis_num(self.dim[0])

padded = self.obj.variable
if self.center[0]:
if is_duck_dask_array(padded.data):
# workaround to make the padded chunk size larger than
# self.window - 1
shift = -(self.window[0] + 1) // 2
offset = (self.window[0] - 1) // 2
valid = (slice(None),) * axis + (
slice(offset, offset + self.obj.shape[axis]),
)
else:
shift = (-self.window[0] // 2) + 1
valid = (slice(None),) * axis + (slice(-shift, None),)
padded = padded.pad({self.dim[0]: (0, -shift)}, mode="constant")

if is_duck_dask_array(padded.data) and False:
raise AssertionError("should not be reachable")
else:
values = func(
padded.data,
window=self.window[0],
min_count=self.min_periods,
axis=axis,
)

if self.center[0]:
values = values[valid]

attrs = self.obj.attrs if keep_attrs else {}

return self.obj.__class__(
values, self.obj.coords, attrs=attrs, name=self.obj.name
)

def _bottleneck_reduce(self, func, keep_attrs, **kwargs):
# bottleneck doesn't allow min_count to be 0, although it should
# work the same as if min_count = 1
# Note bottleneck only works with 1d-rolling.
Expand Down Expand Up @@ -550,12 +602,15 @@ def _bottleneck_reduce(self, func, keep_attrs, **kwargs):

attrs = self.obj.attrs if keep_attrs else {}

return DataArray(values, self.obj.coords, attrs=attrs, name=self.obj.name)
return self.obj.__class__(
values, self.obj.coords, attrs=attrs, name=self.obj.name
)

def _numpy_or_bottleneck_reduce(
def _array_reduce(
self,
array_agg_func,
bottleneck_move_func,
numbagg_move_func,
rolling_agg_func,
keep_attrs,
fillna,
Expand All @@ -571,6 +626,35 @@ def _numpy_or_bottleneck_reduce(
)
del kwargs["dim"]

if (
OPTIONS["use_numbagg"]
and module_available("numbagg")
and pycompat.mod_version("numbagg") >= Version("0.6.3")
and numbagg_move_func is not None
# TODO: we could at least allow this for the equivalent of `apply_ufunc`'s
# "parallelized". `rolling_exp` does this, as an example (but rolling_exp is
# much simpler)
and not is_duck_dask_array(self.obj.data)
# Numbagg doesn't handle object arrays and generally has dtype consistency,
# so doesn't deal well with bool arrays which are expected to change type.
and self.obj.data.dtype.kind not in "ObMm"
# TODO: we could also allow this, probably as part of a refactoring of this
# module, so we can use the machinery in `self.reduce`.
and self.ndim == 1
):
import numbagg

# Numbagg has a default ddof of 1. I (@max-sixty) think we should make
# this the default in xarray too, but until we do, don't use numbagg for
# std and var unless ddof is set to 1.
if (
numbagg_move_func not in [numbagg.move_std, numbagg.move_var]
or kwargs.get("ddof") == 1
):
return self._numbagg_reduce(
numbagg_move_func, keep_attrs=keep_attrs, **kwargs
)

if (
OPTIONS["use_bottleneck"]
and bottleneck_move_func is not None
Expand All @@ -583,8 +667,10 @@ def _numpy_or_bottleneck_reduce(
return self._bottleneck_reduce(
bottleneck_move_func, keep_attrs=keep_attrs, **kwargs
)

if rolling_agg_func:
return rolling_agg_func(self, keep_attrs=self._get_keep_attrs(keep_attrs))

if fillna is not None:
if fillna is dtypes.INF:
fillna = dtypes.get_pos_infinity(self.obj.dtype, max_for_int=True)
Expand Down Expand Up @@ -705,7 +791,7 @@ def _counts(self, keep_attrs: bool | None) -> Dataset:
DataArrayRolling._counts, keep_attrs=keep_attrs
)

def _numpy_or_bottleneck_reduce(
def _array_reduce(
self,
array_agg_func,
bottleneck_move_func,
Expand All @@ -715,7 +801,7 @@ def _numpy_or_bottleneck_reduce(
):
return self._dataset_implementation(
functools.partial(
DataArrayRolling._numpy_or_bottleneck_reduce,
DataArrayRolling._array_reduce,
array_agg_func=array_agg_func,
bottleneck_move_func=bottleneck_move_func,
rolling_agg_func=rolling_agg_func,
Expand Down
50 changes: 37 additions & 13 deletions xarray/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from xarray import DataArray, Dataset, set_options
from xarray.tests import (
assert_allclose,
assert_array_equal,
assert_equal,
assert_identical,
has_dask,
Expand All @@ -24,6 +23,19 @@
]


@pytest.fixture(params=["numbagg", "bottleneck"])
def compute_backend(request):
if request.param == "bottleneck":
options = dict(use_bottleneck=True, use_numbagg=False)
elif request.param == "numbagg":
options = dict(use_bottleneck=False, use_numbagg=True)
else:
raise ValueError

with xr.set_options(**options):
yield request.param


class TestDataArrayRolling:
@pytest.mark.parametrize("da", (1, 2), indirect=True)
@pytest.mark.parametrize("center", [True, False])
Expand Down Expand Up @@ -87,9 +99,10 @@ def test_rolling_properties(self, da) -> None:
@pytest.mark.parametrize("center", (True, False, None))
@pytest.mark.parametrize("min_periods", (1, None))
@pytest.mark.parametrize("backend", ["numpy"], indirect=True)
def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None:
def test_rolling_wrapped_bottleneck(
self, da, name, center, min_periods, compute_backend
) -> None:
bn = pytest.importorskip("bottleneck", minversion="1.1")

# Test all bottleneck functions
rolling_obj = da.rolling(time=7, min_periods=min_periods)

Expand All @@ -98,15 +111,18 @@ def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None
expected = getattr(bn, func_name)(
da.values, window=7, axis=1, min_count=min_periods
)
assert_array_equal(actual.values, expected)

# Using assert_allclose because we get tiny (1e-17) differences in numbagg.
np.testing.assert_allclose(actual.values, expected)

with pytest.warns(DeprecationWarning, match="Reductions are applied"):
getattr(rolling_obj, name)(dim="time")

# Test center
rolling_obj = da.rolling(time=7, center=center)
actual = getattr(rolling_obj, name)()["time"]
assert_equal(actual, da["time"])
# Using assert_allclose because we get tiny (1e-17) differences in numbagg.
assert_allclose(actual, da["time"])

@requires_dask
@pytest.mark.parametrize("name", ("mean", "count"))
Expand Down Expand Up @@ -153,7 +169,9 @@ def test_rolling_wrapped_dask_nochunk(self, center) -> None:
@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
def test_rolling_pandas_compat(self, center, window, min_periods) -> None:
def test_rolling_pandas_compat(
self, center, window, min_periods, compute_backend
) -> None:
s = pd.Series(np.arange(10))
da = DataArray.from_series(s)

Expand Down Expand Up @@ -203,7 +221,9 @@ def test_rolling_construct(self, center: bool, window: int) -> None:
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
@pytest.mark.parametrize("name", ("sum", "mean", "std", "max"))
def test_rolling_reduce(self, da, center, min_periods, window, name) -> None:
def test_rolling_reduce(
self, da, center, min_periods, window, name, compute_backend
) -> None:
if min_periods is not None and window < min_periods:
min_periods = window

Expand All @@ -223,7 +243,9 @@ def test_rolling_reduce(self, da, center, min_periods, window, name) -> None:
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
@pytest.mark.parametrize("name", ("sum", "max"))
def test_rolling_reduce_nonnumeric(self, center, min_periods, window, name) -> None:
def test_rolling_reduce_nonnumeric(
self, center, min_periods, window, name, compute_backend
) -> None:
da = DataArray(
[0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims="time"
).isnull()
Expand All @@ -239,7 +261,7 @@ def test_rolling_reduce_nonnumeric(self, center, min_periods, window, name) -> N
assert_allclose(actual, expected)
assert actual.dims == expected.dims

def test_rolling_count_correct(self) -> None:
def test_rolling_count_correct(self, compute_backend) -> None:
da = DataArray([0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims="time")

kwargs: list[dict[str, Any]] = [
Expand Down Expand Up @@ -279,7 +301,9 @@ def test_rolling_count_correct(self) -> None:
@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1))
@pytest.mark.parametrize("name", ("sum", "mean", "max"))
def test_ndrolling_reduce(self, da, center, min_periods, name) -> None:
def test_ndrolling_reduce(
self, da, center, min_periods, name, compute_backend
) -> None:
rolling_obj = da.rolling(time=3, x=2, center=center, min_periods=min_periods)

actual = getattr(rolling_obj, name)()
Expand Down Expand Up @@ -560,7 +584,7 @@ def test_rolling_properties(self, ds) -> None:
@pytest.mark.parametrize("key", ("z1", "z2"))
@pytest.mark.parametrize("backend", ["numpy"], indirect=True)
def test_rolling_wrapped_bottleneck(
self, ds, name, center, min_periods, key
self, ds, name, center, min_periods, key, compute_backend
) -> None:
bn = pytest.importorskip("bottleneck", minversion="1.1")

Expand All @@ -577,12 +601,12 @@ def test_rolling_wrapped_bottleneck(
)
else:
raise ValueError
assert_array_equal(actual[key].values, expected)
np.testing.assert_allclose(actual[key].values, expected)

# Test center
rolling_obj = ds.rolling(time=7, center=center)
actual = getattr(rolling_obj, name)()["time"]
assert_equal(actual, ds["time"])
assert_allclose(actual, ds["time"])

@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
Expand Down

0 comments on commit 1f94829

Please sign in to comment.