-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose apply_ufunc as public API and add documentation #1619
Changes from 3 commits
a15a932
04cdcda
88aa68f
224f917
63a98aa
c6f4371
03754f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ Top-level functions | |
.. autosummary:: | ||
:toctree: generated/ | ||
|
||
apply_ufunc | ||
align | ||
broadcast | ||
concat | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
.. _dask: | ||
|
||
Out of core computation with dask | ||
================================= | ||
Parallel computing with dask | ||
============================ | ||
|
||
xarray integrates with `dask <http://dask.pydata.org/>`__ to support streaming | ||
computation on datasets that don't fit into memory. | ||
xarray integrates with `dask <http://dask.pydata.org/>`__ to support parallel | ||
computations and streaming computation on datasets that don't fit into memory. | ||
|
||
Currently, dask is an entirely optional feature for xarray. However, the | ||
benefits of using dask are sufficiently strong that dask may become a required | ||
|
@@ -33,7 +33,7 @@ to your screen or write to disk). At that point, data is loaded into memory | |
and computation proceeds in a streaming fashion, block-by-block. | ||
|
||
The actual computation is controlled by a multi-processing or thread pool, | ||
which allows dask to take full advantage of multiple processers available on | ||
which allows dask to take full advantage of multiple processors available on | ||
most modern computers. | ||
|
||
For more details on dask, read `its documentation <http://dask.pydata.org/>`__. | ||
|
@@ -213,6 +213,93 @@ loaded into dask or not: | |
In the future, we may extend ``.data`` to support other "computable" array | ||
backends beyond dask and numpy (e.g., to support sparse arrays). | ||
|
||
.. _dask.automatic-parallelization: | ||
|
||
Automatic parallelization | ||
------------------------- | ||
|
||
Almost all of xarray's built-in operations work on dask arrays. If you want to | ||
use a function that isn't wrapped by xarray, one option is to extract dask | ||
arrays from xarray objects (``.data``) and use dask directly. | ||
|
||
Another option is to use xarray's :py:func:`~xarray.apply_ufunc`, which can | ||
automatically parallelize functions written for processing NumPy arrays when | ||
applied to dask arrays. It works similarly to :py:func:`dask.array.map_blocks` | ||
and :py:func:`dask.array.atop`, but without requiring an immediate layer of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did you mean intermediate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, indeed |
||
abstraction. | ||
|
||
For the best performance when using dask's multi-threaded scheduler, wrap a | ||
function that already releases the global interpreter lock, which fortunately | ||
already includes most NumPy and Scipy functions. Here we show an example | ||
using NumPy operations and a fast function from | ||
`bottleneck <https://github.com/kwgoodman/bottleneck>`__, which | ||
we use to calculate `Spearman's rank-correlation coefficient <https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient>`__: | ||
|
||
.. code-block:: python | ||
|
||
import numpy as np | ||
import xarray as xr | ||
import bottleneck | ||
|
||
def covariance_gufunc(x, y): | ||
return ((x - x.mean(axis=-1, keepdims=True)) | ||
* (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1) | ||
|
||
def pearson_correlation_gufunc(x, y): | ||
return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1)) | ||
|
||
def spearman_correlation_gufunc(x, y): | ||
x_ranks = bottleneck.rankdata(x, axis=-1) | ||
y_ranks = bottleneck.rankdata(y, axis=-1) | ||
return pearson_correlation_gufunc(x_ranks, y_ranks) | ||
|
||
def spearman_correlation(x, y, dim): | ||
return xr.apply_ufunc( | ||
spearman_correlation_gufunc, x, y, | ||
input_core_dims=[[dim], [dim]], | ||
dask='parallelized', | ||
output_dtypes=[float]) | ||
|
||
The only aspect of this example that is different from standard usage of | ||
``apply_ufunc()`` is that we needed to supply the ``output_dtypes`` arguments. | ||
(Read up on :ref:`comput.wrapping-custom` for an explanation of the | ||
"core dimensions" listed in ``input_core_dims``.) | ||
|
||
Our new ``spearman_correlation()`` function achieves near linear speedup | ||
when run on large arrays across the four cores on my laptop. It would also | ||
work as a streaming operation, when run on arrays loaded from disk: | ||
|
||
.. ipython:: | ||
:verbatim: | ||
|
||
In [56]: rs = np.random.RandomState(0) | ||
|
||
In [57]: array1 = xr.DataArray(rs.randn(1000, 100000), dims=['place', 'time']) # 800MB | ||
|
||
In [58]: array2 = array1 + 0.5 * rs.randn(1000, 100000) | ||
|
||
# using one core, on numpy arrays | ||
In [61]: %time _ = spearman_correlation(array1, array2, 'time') | ||
CPU times: user 21.6 s, sys: 2.84 s, total: 24.5 s | ||
Wall time: 24.9 s | ||
|
||
# using all my laptop's cores, with dask | ||
In [63]: r = spearman_correlation(array1.chunk({'place': 10}), array2.chunk({'place': 10}), 'time') | ||
|
||
In [64]: %time _ = r.compute() | ||
CPU times: user 30.9 s, sys: 1.74 s, total: 32.6 s | ||
Wall time: 4.59 s | ||
|
||
.. tip:: | ||
|
||
For the majority of NumPy functions that are already wrapped by dask, it's | ||
usually a better idea to use the pre-existing ``dask.array`` function, by | ||
using either a pre-existing xarray methods or | ||
:py:func:`~xarray.apply_ufunc()` with ``dask='allowed'``. Dask can often | ||
have a more efficient implementation that makes use of the specialized | ||
structure of a problem, unlike the generic speedups offered by | ||
``dask='parallelized'``. | ||
|
||
Chunking and performance | ||
------------------------ | ||
|
||
|
@@ -237,7 +324,6 @@ larger chunksizes. | |
import os | ||
os.remove('example-data.nc') | ||
|
||
|
||
Optimization Tips | ||
----------------- | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,13 @@ Backward Incompatible Changes | |
Enhancements | ||
~~~~~~~~~~~~ | ||
|
||
- New helper function :py:func:`~xarray.apply_ufunc` for wrapping functions | ||
written to work on NumPy arrays to support labels on xarray objects | ||
(:issue:`770`). ``apply_ufunc`` also support automatic parallelization for | ||
many functions with dask. See :ref:`comput.wrapping-custom` and | ||
:ref:`dask.automatic-parallelization` for details. | ||
By `Stephan Hoyer <https://github.com/shoyer>`_. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't forget:
|
||
- Support for `pathlib.Path` objects added to | ||
:py:func:`~xarray.open_dataset`, :py:func:`~xarray.open_mfdataset`, | ||
:py:func:`~xarray.to_netcdf`, and :py:func:`~xarray.save_mfdataset` | ||
|
@@ -232,7 +239,7 @@ Bug fixes | |
The previous behavior unintentionally causing additional tests to be skipped | ||
(:issue:`1531`). By `Joe Hamman <https://github.com/jhamman>`_. | ||
|
||
- Fix pynio backend for upcoming release of pynio with python3 support | ||
- Fix pynio backend for upcoming release of pynio with python3 support | ||
(:issue:`1611`). By `Ben Hillman <https://github/brhillman>`_. | ||
|
||
.. _whats-new.0.9.6: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -676,6 +676,7 @@ def apply_ufunc(func, *args, **kwargs): | |
Method for joining the indexes of the passed objects along each | ||
dimension, and the variables of Dataset objects with mismatched | ||
data variables: | ||
|
||
- 'outer': use the union of object indexes | ||
- 'inner': use the intersection of object indexes | ||
- 'left': use indexes from the first object with each dimension | ||
|
@@ -685,6 +686,7 @@ def apply_ufunc(func, *args, **kwargs): | |
dataset_join : {'outer', 'inner', 'left', 'right', 'exact'}, optional | ||
Method for joining variables of Dataset objects with mismatched | ||
data variables. | ||
|
||
- 'outer': take variables from both Dataset objects | ||
- 'inner': take only overlapped variables | ||
- 'left': take only variables from the first object | ||
|
@@ -701,6 +703,7 @@ def apply_ufunc(func, *args, **kwargs): | |
dask: 'forbidden', 'allowed' or 'parallelized', optional | ||
How to handle applying to objects containing lazy data in the form of | ||
dask arrays: | ||
|
||
- 'forbidden' (default): raise an error if a dask array is encountered. | ||
- 'allowed': pass dask arrays directly on to ``func``. | ||
- 'parallelized': automatically parallelize ``func`` if any of the | ||
|
@@ -724,7 +727,7 @@ def apply_ufunc(func, *args, **kwargs): | |
``apply_ufunc`` to write functions to (very nearly) replicate existing | ||
xarray functionality: | ||
|
||
Calculate the vector magnitude of two arguments: | ||
Calculate the vector magnitude of two arguments:: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you write |
||
|
||
def magnitude(a, b): | ||
func = lambda x, y: np.sqrt(x ** 2 + y ** 2) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the point on speed is a distraction? If an array is that small, the absolute difference in speed is still very small, so it really only makes a difference if you're doing those operations in a loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I had "in the inner loop" in mind when I wrote this, but I see now that that never made it into the text. Let me know if this latest update seems more reasonable to you, or if I'm still pushing too hard on performance considerations.