Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preprocess argument for open_mfdataset and threading lock #446

Merged
merged 2 commits into from
Jun 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,27 @@ What's New
import xray
np.random.seed(123456)

v0.5.2 (unreleased)
-------------------

Enhancements
~~~~~~~~~~~~

- :py:func:`~xray.open_mfdataset` now supports a ``preprocess`` argument for
preprocessing datasets prior to concatenaton. This is useful if datasets
cannot be otherwise merged automatically, e.g., if the original datasets
have conflicting index coordinates (:issue:`443`).
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a
thread lock by default for reading from netCDF files. This avoids possible
segmentation faults for reading from netCDF4 files when HDF5 is not
configured properly for concurrent access (:issue:`444`).

Bug fixes
~~~~~~~~~

- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support
supplying chunks as a single integer.

v0.5.1 (15 June 2015)
---------------------

Expand Down
21 changes: 17 additions & 4 deletions xray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _get_default_engine(path, allow_remote=False):
def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
concat_characters=True, decode_coords=True, engine=None,
chunks=None):
chunks=None, lock=True):
"""Load and decode a dataset from a file or file-like object.

Parameters
Expand Down Expand Up @@ -79,6 +79,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
If chunks is provided, it used to load the new dataset into dask
arrays. This is an experimental feature; see the documentation for more
details.
lock : optional
If chunks is provided, this argument is passed on to
:py:func:`dask.array.from_array`. By default, a lock is used to avoid
issues with concurrent access with dask's multithreaded backend.

Returns
-------
Expand All @@ -100,7 +104,7 @@ def maybe_decode_store(store):
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
concat_characters=concat_characters, decode_coords=decode_coords)
if chunks is not None:
ds = ds.chunk(chunks)
ds = ds.chunk(chunks, lock=lock)
return ds

if isinstance(filename_or_obj, backends.AbstractDataStore):
Expand Down Expand Up @@ -161,7 +165,8 @@ def close(self):
f.close()


def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs):
def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
lock=True, **kwargs):
"""Open multiple files as a single dataset.

Experimental. Requires dask to be installed.
Expand All @@ -183,6 +188,12 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs):
need to provide this argument if the dimension along which you want to
concatenate is not a dimension in the original datasets, e.g., if you
want to stack a collection of 2D arrays along a third dimension.
preprocess : callable, optional
If provided, call this function on each dataset prior to concatenation.
lock : optional
This argument is passed on to :py:func:`dask.array.from_array`. By
default, a lock is used to avoid issues with concurrent access with
dask's multithreaded backend.
**kwargs : optional
Additional arguments passed on to :py:func:`xray.open_dataset`.

Expand All @@ -201,7 +212,9 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs):
raise IOError('no files to open')
datasets = [open_dataset(p, **kwargs) for p in paths]
file_objs = [ds._file_obj for ds in datasets]
datasets = [ds.chunk(chunks) for ds in datasets]
datasets = [ds.chunk(chunks, lock=lock) for ds in datasets]
if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]
combined = auto_combine(datasets, concat_dim=concat_dim)
combined._file_obj = _MultiFileCloser(file_objs)
return combined
Expand Down
9 changes: 6 additions & 3 deletions xray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ def chunks(self):
chunks.update(new_chunks)
return Frozen(SortedKeysDict(chunks))

def chunk(self, chunks=None):
def chunk(self, chunks=None, lock=False):
"""Coerce all arrays in this dataset into dask arrays with the given
chunks.

Expand All @@ -899,13 +899,16 @@ def chunk(self, chunks=None):
chunks : int or dict, optional
Chunk sizes along each dimension, e.g., ``5`` or
``{'x': 5, 'y': 5}``.
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.

Returns
-------
chunked : xray.Dataset
"""
if isinstance(chunks, Number):
chunks = dict.fromkeys(chunks, chunks)
chunks = dict.fromkeys(self.dims, chunks)

if chunks is not None:
bad_dims = [d for d in chunks if d not in self.dims]
Expand All @@ -923,7 +926,7 @@ def maybe_chunk(name, var, chunks):
if not chunks:
chunks = None
if var.ndim > 0:
return var.chunk(chunks, name=name)
return var.chunk(chunks, name=name, lock=lock)
else:
return var

Expand Down
7 changes: 5 additions & 2 deletions xray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def chunks(self):

_array_counter = itertools.count()

def chunk(self, chunks=None, name=''):
def chunk(self, chunks=None, name='', lock=False):
"""Coerce this array's data into a dask arrays with the given chunks.

If this variable is a non-dask array, it will be converted to dask
Expand All @@ -432,6 +432,9 @@ def chunk(self, chunks=None, name=''):
name : str, optional
Used to generate the name for this array in the internal dask
graph. Does not need not be unique.
lock : optional
Passed on to :py:func:`dask.array.from_array`, if the array is not
already as dask array.

Returns
-------
Expand All @@ -458,7 +461,7 @@ def chunk(self, chunks=None, name=''):
chunks = tuple(chunks.get(n, s)
for n, s in enumerate(self.shape))

data = da.from_array(data, chunks, name=name)
data = da.from_array(data, chunks, name=name, lock=lock)

return type(self)(self.dims, data, self._attrs, self._encoding,
fastpath=True)
Expand Down
27 changes: 25 additions & 2 deletions xray/test/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from io import BytesIO
from threading import Lock
import contextlib
import os.path
import pickle
Expand Down Expand Up @@ -715,6 +716,26 @@ def test_open_mfdataset(self):
with self.assertRaisesRegexp(IOError, 'no files to open'):
open_mfdataset('foo-bar-baz-*.nc')

def test_preprocess_mfdataset(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
with create_tmp_file() as tmp:
original.to_netcdf(tmp)
preprocess = lambda ds: ds.assign_coords(z=0)
expected = preprocess(original)
with open_mfdataset(tmp, preprocess=preprocess) as actual:
self.assertDatasetIdentical(expected, actual)

def test_lock(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
with create_tmp_file() as tmp:
original.to_netcdf(tmp)
with open_dataset(tmp, chunks=10) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))

def test_open_and_do_math(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
with create_tmp_file() as tmp:
Expand All @@ -730,10 +751,12 @@ def test_open_dataset(self):
with open_dataset(tmp, chunks={'x': 5}) as actual:
self.assertIsInstance(actual.foo.variable.data, da.Array)
self.assertEqual(actual.foo.variable.data.chunks, ((5, 5),))
self.assertDatasetAllClose(original, actual)
self.assertDatasetIdentical(original, actual)
with open_dataset(tmp, chunks=5) as actual:
self.assertDatasetIdentical(original, actual)
with open_dataset(tmp) as actual:
self.assertIsInstance(actual.foo.variable.data, np.ndarray)
self.assertDatasetAllClose(original, actual)
self.assertDatasetIdentical(original, actual)

def test_dask_roundtrip(self):
with create_tmp_file() as tmp:
Expand Down