From 5c07ed3ba320bb8bf3d803e5d585afe0fa84021c Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 27 Jun 2015 20:02:45 -0700 Subject: [PATCH 1/2] Add a preprocess argument to open_mfdataset Fixes #443 --- doc/whats-new.rst | 11 +++++++++++ xray/backends/api.py | 7 ++++++- xray/test/test_backends.py | 9 +++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 7a5a6224430..378c571d8e5 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -9,6 +9,17 @@ What's New import xray np.random.seed(123456) +v0.5.2 (unreleased) +------------------- + +Enhancements +~~~~~~~~~~~~ + +- :py:func:`~xray.open_mfdataset` now supports a ``preprocess`` argument for + preprocessing datasets prior to concatenaton. This is useful if datasets + cannot be otherwise merged automatically, e.g., if the original datasets + have conflicting index coordinates. + v0.5.1 (15 June 2015) --------------------- diff --git a/xray/backends/api.py b/xray/backends/api.py index af8406d4881..0579f5555b6 100644 --- a/xray/backends/api.py +++ b/xray/backends/api.py @@ -161,7 +161,8 @@ def close(self): f.close() -def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs): +def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, + **kwargs): """Open multiple files as a single dataset. Experimental. Requires dask to be installed. @@ -183,6 +184,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs): need to provide this argument if the dimension along which you want to concatenate is not a dimension in the original datasets, e.g., if you want to stack a collection of 2D arrays along a third dimension. + preprocess : callable, optional + If provided, call this function on each dataset prior to concatenation. **kwargs : optional Additional arguments passed on to :py:func:`xray.open_dataset`. @@ -202,6 +205,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs): datasets = [open_dataset(p, **kwargs) for p in paths] file_objs = [ds._file_obj for ds in datasets] datasets = [ds.chunk(chunks) for ds in datasets] + if preprocess is not None: + datasets = [preprocess(ds) for ds in datasets] combined = auto_combine(datasets, concat_dim=concat_dim) combined._file_obj = _MultiFileCloser(file_objs) return combined diff --git a/xray/test/test_backends.py b/xray/test/test_backends.py index e650903b11e..8be9a5fe044 100644 --- a/xray/test/test_backends.py +++ b/xray/test/test_backends.py @@ -715,6 +715,15 @@ def test_open_mfdataset(self): with self.assertRaisesRegexp(IOError, 'no files to open'): open_mfdataset('foo-bar-baz-*.nc') + def test_preprocess_mfdataset(self): + original = Dataset({'foo': ('x', np.random.randn(10))}) + with create_tmp_file() as tmp: + original.to_netcdf(tmp) + preprocess = lambda ds: ds.assign_coords(z=0) + expected = preprocess(original) + with open_mfdataset(tmp, preprocess=preprocess) as actual: + self.assertDatasetIdentical(expected, actual) + def test_open_and_do_math(self): original = Dataset({'foo': ('x', np.random.randn(10))}) with create_tmp_file() as tmp: From 0816424dcc65c4aa191478c22b9ac41a6eb2a1f3 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 27 Jun 2015 20:31:17 -0700 Subject: [PATCH 2/2] Add a lock when opening datasets with dask Fixes #444 --- doc/whats-new.rst | 12 +++++++++++- xray/backends/api.py | 16 ++++++++++++---- xray/core/dataset.py | 9 ++++++--- xray/core/variable.py | 7 +++++-- xray/test/test_backends.py | 18 ++++++++++++++++-- 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 378c571d8e5..b1849dc3d79 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -18,7 +18,17 @@ Enhancements - :py:func:`~xray.open_mfdataset` now supports a ``preprocess`` argument for preprocessing datasets prior to concatenaton. This is useful if datasets cannot be otherwise merged automatically, e.g., if the original datasets - have conflicting index coordinates. + have conflicting index coordinates (:issue:`443`). +- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a + thread lock by default for reading from netCDF files. This avoids possible + segmentation faults for reading from netCDF4 files when HDF5 is not + configured properly for concurrent access (:issue:`444`). + +Bug fixes +~~~~~~~~~ + +- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support + supplying chunks as a single integer. v0.5.1 (15 June 2015) --------------------- diff --git a/xray/backends/api.py b/xray/backends/api.py index 0579f5555b6..02f0344f7f4 100644 --- a/xray/backends/api.py +++ b/xray/backends/api.py @@ -38,7 +38,7 @@ def _get_default_engine(path, allow_remote=False): def open_dataset(filename_or_obj, group=None, decode_cf=True, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, engine=None, - chunks=None): + chunks=None, lock=True): """Load and decode a dataset from a file or file-like object. Parameters @@ -79,6 +79,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True, If chunks is provided, it used to load the new dataset into dask arrays. This is an experimental feature; see the documentation for more details. + lock : optional + If chunks is provided, this argument is passed on to + :py:func:`dask.array.from_array`. By default, a lock is used to avoid + issues with concurrent access with dask's multithreaded backend. Returns ------- @@ -100,7 +104,7 @@ def maybe_decode_store(store): store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords) if chunks is not None: - ds = ds.chunk(chunks) + ds = ds.chunk(chunks, lock=lock) return ds if isinstance(filename_or_obj, backends.AbstractDataStore): @@ -162,7 +166,7 @@ def close(self): def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, - **kwargs): + lock=True, **kwargs): """Open multiple files as a single dataset. Experimental. Requires dask to be installed. @@ -186,6 +190,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, want to stack a collection of 2D arrays along a third dimension. preprocess : callable, optional If provided, call this function on each dataset prior to concatenation. + lock : optional + This argument is passed on to :py:func:`dask.array.from_array`. By + default, a lock is used to avoid issues with concurrent access with + dask's multithreaded backend. **kwargs : optional Additional arguments passed on to :py:func:`xray.open_dataset`. @@ -204,7 +212,7 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, raise IOError('no files to open') datasets = [open_dataset(p, **kwargs) for p in paths] file_objs = [ds._file_obj for ds in datasets] - datasets = [ds.chunk(chunks) for ds in datasets] + datasets = [ds.chunk(chunks, lock=lock) for ds in datasets] if preprocess is not None: datasets = [preprocess(ds) for ds in datasets] combined = auto_combine(datasets, concat_dim=concat_dim) diff --git a/xray/core/dataset.py b/xray/core/dataset.py index 3f9d02de0cb..0018174d78d 100644 --- a/xray/core/dataset.py +++ b/xray/core/dataset.py @@ -883,7 +883,7 @@ def chunks(self): chunks.update(new_chunks) return Frozen(SortedKeysDict(chunks)) - def chunk(self, chunks=None): + def chunk(self, chunks=None, lock=False): """Coerce all arrays in this dataset into dask arrays with the given chunks. @@ -899,13 +899,16 @@ def chunk(self, chunks=None): chunks : int or dict, optional Chunk sizes along each dimension, e.g., ``5`` or ``{'x': 5, 'y': 5}``. + lock : optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. Returns ------- chunked : xray.Dataset """ if isinstance(chunks, Number): - chunks = dict.fromkeys(chunks, chunks) + chunks = dict.fromkeys(self.dims, chunks) if chunks is not None: bad_dims = [d for d in chunks if d not in self.dims] @@ -923,7 +926,7 @@ def maybe_chunk(name, var, chunks): if not chunks: chunks = None if var.ndim > 0: - return var.chunk(chunks, name=name) + return var.chunk(chunks, name=name, lock=lock) else: return var diff --git a/xray/core/variable.py b/xray/core/variable.py index a5d655337a7..597e3b10aec 100644 --- a/xray/core/variable.py +++ b/xray/core/variable.py @@ -413,7 +413,7 @@ def chunks(self): _array_counter = itertools.count() - def chunk(self, chunks=None, name=''): + def chunk(self, chunks=None, name='', lock=False): """Coerce this array's data into a dask arrays with the given chunks. If this variable is a non-dask array, it will be converted to dask @@ -432,6 +432,9 @@ def chunk(self, chunks=None, name=''): name : str, optional Used to generate the name for this array in the internal dask graph. Does not need not be unique. + lock : optional + Passed on to :py:func:`dask.array.from_array`, if the array is not + already as dask array. Returns ------- @@ -458,7 +461,7 @@ def chunk(self, chunks=None, name=''): chunks = tuple(chunks.get(n, s) for n, s in enumerate(self.shape)) - data = da.from_array(data, chunks, name=name) + data = da.from_array(data, chunks, name=name, lock=lock) return type(self)(self.dims, data, self._attrs, self._encoding, fastpath=True) diff --git a/xray/test/test_backends.py b/xray/test/test_backends.py index 8be9a5fe044..394055a4327 100644 --- a/xray/test/test_backends.py +++ b/xray/test/test_backends.py @@ -1,4 +1,5 @@ from io import BytesIO +from threading import Lock import contextlib import os.path import pickle @@ -724,6 +725,17 @@ def test_preprocess_mfdataset(self): with open_mfdataset(tmp, preprocess=preprocess) as actual: self.assertDatasetIdentical(expected, actual) + def test_lock(self): + original = Dataset({'foo': ('x', np.random.randn(10))}) + with create_tmp_file() as tmp: + original.to_netcdf(tmp) + with open_dataset(tmp, chunks=10) as ds: + task = ds.foo.data.dask[ds.foo.data.name, 0] + self.assertIsInstance(task[-1], type(Lock())) + with open_mfdataset(tmp) as ds: + task = ds.foo.data.dask[ds.foo.data.name, 0] + self.assertIsInstance(task[-1], type(Lock())) + def test_open_and_do_math(self): original = Dataset({'foo': ('x', np.random.randn(10))}) with create_tmp_file() as tmp: @@ -739,10 +751,12 @@ def test_open_dataset(self): with open_dataset(tmp, chunks={'x': 5}) as actual: self.assertIsInstance(actual.foo.variable.data, da.Array) self.assertEqual(actual.foo.variable.data.chunks, ((5, 5),)) - self.assertDatasetAllClose(original, actual) + self.assertDatasetIdentical(original, actual) + with open_dataset(tmp, chunks=5) as actual: + self.assertDatasetIdentical(original, actual) with open_dataset(tmp) as actual: self.assertIsInstance(actual.foo.variable.data, np.ndarray) - self.assertDatasetAllClose(original, actual) + self.assertDatasetIdentical(original, actual) def test_dask_roundtrip(self): with create_tmp_file() as tmp: