diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 7a5a6224430..b1849dc3d79 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -9,6 +9,27 @@ What's New import xray np.random.seed(123456) +v0.5.2 (unreleased) +------------------- + +Enhancements +~~~~~~~~~~~~ + +- :py:func:`~xray.open_mfdataset` now supports a ``preprocess`` argument for + preprocessing datasets prior to concatenaton. This is useful if datasets + cannot be otherwise merged automatically, e.g., if the original datasets + have conflicting index coordinates (:issue:`443`). +- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a + thread lock by default for reading from netCDF files. This avoids possible + segmentation faults for reading from netCDF4 files when HDF5 is not + configured properly for concurrent access (:issue:`444`). + +Bug fixes +~~~~~~~~~ + +- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support + supplying chunks as a single integer. + v0.5.1 (15 June 2015) --------------------- diff --git a/xray/backends/api.py b/xray/backends/api.py index af8406d4881..02f0344f7f4 100644 --- a/xray/backends/api.py +++ b/xray/backends/api.py @@ -38,7 +38,7 @@ def _get_default_engine(path, allow_remote=False): def open_dataset(filename_or_obj, group=None, decode_cf=True, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, engine=None, - chunks=None): + chunks=None, lock=True): """Load and decode a dataset from a file or file-like object. Parameters @@ -79,6 +79,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True, If chunks is provided, it used to load the new dataset into dask arrays. This is an experimental feature; see the documentation for more details. + lock : optional + If chunks is provided, this argument is passed on to + :py:func:`dask.array.from_array`. By default, a lock is used to avoid + issues with concurrent access with dask's multithreaded backend. Returns ------- @@ -100,7 +104,7 @@ def maybe_decode_store(store): store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords) if chunks is not None: - ds = ds.chunk(chunks) + ds = ds.chunk(chunks, lock=lock) return ds if isinstance(filename_or_obj, backends.AbstractDataStore): @@ -161,7 +165,8 @@ def close(self): f.close() -def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs): +def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, + lock=True, **kwargs): """Open multiple files as a single dataset. Experimental. Requires dask to be installed. @@ -183,6 +188,12 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs): need to provide this argument if the dimension along which you want to concatenate is not a dimension in the original datasets, e.g., if you want to stack a collection of 2D arrays along a third dimension. + preprocess : callable, optional + If provided, call this function on each dataset prior to concatenation. + lock : optional + This argument is passed on to :py:func:`dask.array.from_array`. By + default, a lock is used to avoid issues with concurrent access with + dask's multithreaded backend. **kwargs : optional Additional arguments passed on to :py:func:`xray.open_dataset`. @@ -201,7 +212,9 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs): raise IOError('no files to open') datasets = [open_dataset(p, **kwargs) for p in paths] file_objs = [ds._file_obj for ds in datasets] - datasets = [ds.chunk(chunks) for ds in datasets] + datasets = [ds.chunk(chunks, lock=lock) for ds in datasets] + if preprocess is not None: + datasets = [preprocess(ds) for ds in datasets] combined = auto_combine(datasets, concat_dim=concat_dim) combined._file_obj = _MultiFileCloser(file_objs) return combined diff --git a/xray/core/dataset.py b/xray/core/dataset.py index 3f9d02de0cb..0018174d78d 100644 --- a/xray/core/dataset.py +++ b/xray/core/dataset.py @@ -883,7 +883,7 @@ def chunks(self): chunks.update(new_chunks) return Frozen(SortedKeysDict(chunks)) - def chunk(self, chunks=None): + def chunk(self, chunks=None, lock=False): """Coerce all arrays in this dataset into dask arrays with the given chunks. @@ -899,13 +899,16 @@ def chunk(self, chunks=None): chunks : int or dict, optional Chunk sizes along each dimension, e.g., ``5`` or ``{'x': 5, 'y': 5}``. + lock : optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. Returns ------- chunked : xray.Dataset """ if isinstance(chunks, Number): - chunks = dict.fromkeys(chunks, chunks) + chunks = dict.fromkeys(self.dims, chunks) if chunks is not None: bad_dims = [d for d in chunks if d not in self.dims] @@ -923,7 +926,7 @@ def maybe_chunk(name, var, chunks): if not chunks: chunks = None if var.ndim > 0: - return var.chunk(chunks, name=name) + return var.chunk(chunks, name=name, lock=lock) else: return var diff --git a/xray/core/variable.py b/xray/core/variable.py index a5d655337a7..597e3b10aec 100644 --- a/xray/core/variable.py +++ b/xray/core/variable.py @@ -413,7 +413,7 @@ def chunks(self): _array_counter = itertools.count() - def chunk(self, chunks=None, name=''): + def chunk(self, chunks=None, name='', lock=False): """Coerce this array's data into a dask arrays with the given chunks. If this variable is a non-dask array, it will be converted to dask @@ -432,6 +432,9 @@ def chunk(self, chunks=None, name=''): name : str, optional Used to generate the name for this array in the internal dask graph. Does not need not be unique. + lock : optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. Returns ------- @@ -458,7 +461,7 @@ def chunk(self, chunks=None, name=''): chunks = tuple(chunks.get(n, s) for n, s in enumerate(self.shape)) - data = da.from_array(data, chunks, name=name) + data = da.from_array(data, chunks, name=name, lock=lock) return type(self)(self.dims, data, self._attrs, self._encoding, fastpath=True) diff --git a/xray/test/test_backends.py b/xray/test/test_backends.py index e650903b11e..394055a4327 100644 --- a/xray/test/test_backends.py +++ b/xray/test/test_backends.py @@ -1,4 +1,5 @@ from io import BytesIO +from threading import Lock import contextlib import os.path import pickle @@ -715,6 +716,26 @@ def test_open_mfdataset(self): with self.assertRaisesRegexp(IOError, 'no files to open'): open_mfdataset('foo-bar-baz-*.nc') + def test_preprocess_mfdataset(self): + original = Dataset({'foo': ('x', np.random.randn(10))}) + with create_tmp_file() as tmp: + original.to_netcdf(tmp) + preprocess = lambda ds: ds.assign_coords(z=0) + expected = preprocess(original) + with open_mfdataset(tmp, preprocess=preprocess) as actual: + self.assertDatasetIdentical(expected, actual) + + def test_lock(self): + original = Dataset({'foo': ('x', np.random.randn(10))}) + with create_tmp_file() as tmp: + original.to_netcdf(tmp) + with open_dataset(tmp, chunks=10) as ds: + task = ds.foo.data.dask[ds.foo.data.name, 0] + self.assertIsInstance(task[-1], type(Lock())) + with open_mfdataset(tmp) as ds: + task = ds.foo.data.dask[ds.foo.data.name, 0] + self.assertIsInstance(task[-1], type(Lock())) + def test_open_and_do_math(self): original = Dataset({'foo': ('x', np.random.randn(10))}) with create_tmp_file() as tmp: @@ -730,10 +751,12 @@ def test_open_dataset(self): with open_dataset(tmp, chunks={'x': 5}) as actual: self.assertIsInstance(actual.foo.variable.data, da.Array) self.assertEqual(actual.foo.variable.data.chunks, ((5, 5),)) - self.assertDatasetAllClose(original, actual) + self.assertDatasetIdentical(original, actual) + with open_dataset(tmp, chunks=5) as actual: + self.assertDatasetIdentical(original, actual) with open_dataset(tmp) as actual: self.assertIsInstance(actual.foo.variable.data, np.ndarray) - self.assertDatasetAllClose(original, actual) + self.assertDatasetIdentical(original, actual) def test_dask_roundtrip(self): with create_tmp_file() as tmp: