diff --git a/doc/whats-new.rst b/doc/whats-new.rst index ce305a6aa24..31048f333ab 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -25,6 +25,13 @@ Breaking changes merges will now succeed in cases that previously raised ``xarray.MergeError``. Set ``compat='broadcast_equals'`` to restore the previous default. +- Pickling an xarray object based on the dask backend, or reading its + :py:meth:`values` property, won't automatically convert the array from dask + to numpy in the original object anymore. + If a dask object is used as a coord of a :py:class:`~xarray.DataArray` or + :py:class:`~xarray.Dataset`, its values are eagerly computed and cached, + but only if it's used to index a dim (e.g. it's used for alignment). + By `Guido Imperiale `_. Deprecations ~~~~~~~~~~~~ @@ -52,32 +59,31 @@ Enhancements - Add checking of ``attr`` names and values when saving to netCDF, raising useful error messages if they are invalid. (:issue:`911`). By `Robin Wilson `_. - - Added ability to save ``DataArray`` objects directly to netCDF files using :py:meth:`~xarray.DataArray.to_netcdf`, and to load directly from netCDF files using :py:func:`~xarray.open_dataarray` (:issue:`915`). These remove the need to convert a ``DataArray`` to a ``Dataset`` before saving as a netCDF file, and deals with names to ensure a perfect 'roundtrip' capability. By `Robin Wilson `_. - - Multi-index levels are now accessible as "virtual" coordinate variables, e.g., ``ds['time']`` can pull out the ``'time'`` level of a multi-index (see :ref:`coordinates`). ``sel`` also accepts providing multi-index levels as keyword arguments, e.g., ``ds.sel(time='2000-01')`` (see :ref:`multi-level indexing`). By `Benoit Bovy `_. - - Added the ``compat`` option ``'no_conflicts'`` to ``merge``, allowing the combination of xarray objects with disjoint (:issue:`742`) or overlapping (:issue:`835`) coordinates as long as all present data agrees. By `Johnnie Gray `_. See :ref:`combining.no_conflicts` for more details. - - It is now possible to set ``concat_dim=None`` explicitly in :py:func:`~xarray.open_mfdataset` to disable inferring a dimension along which to concatenate. By `Stephan Hoyer `_. - +- Added methods :py:meth:`DataArray.compute`, :py:meth:`Dataset.compute`, and + :py:meth:`Variable.compute` as a non-mutating alternative to + :py:meth:`~DataArray.load`. + By `Guido Imperiale `_. - Adds DataArray and Dataset methods :py:meth:`~xarray.DataArray.cumsum` and :py:meth:`~xarray.DataArray.cumprod`. By `Phillip J. Wolfram `_. diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index fdbb8c773f6..2767c8c4a7a 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -570,6 +570,19 @@ def load(self): self._coords = new._coords return self + def compute(self): + """Manually trigger loading of this array's data from disk or a + remote source into memory and return a new array. The original is + left unaltered. + + Normally, it should not be necessary to call this method in user code, + because all xarray functions should either work on deferred data or + load data automatically. However, this method can be necessary when + working with many file objects on disk. + """ + new = self.copy(deep=False) + return new.load() + def copy(self, deep=True): """Returns a copy of this array. diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 964bd3dbb7a..282409cf4ca 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -260,8 +260,11 @@ def load_store(cls, store, decoder=None): return obj def __getstate__(self): - """Always load data in-memory before pickling""" - self.load() + """Load data in-memory before pickling (except for Dask data)""" + for v in self.variables.values(): + if not isinstance(v.data, dask_array_type): + v.load() + # self.__dict__ is the default pickle object, we don't need to # implement our own __setstate__ method to make pickle work state = self.__dict__.copy() @@ -342,6 +345,19 @@ def load(self): return self + def compute(self): + """Manually trigger loading of this dataset's data from disk or a + remote source into memory and return a new dataset. The original is + left unaltered. + + Normally, it should not be necessary to call this method in user code, + because all xarray functions should either work on deferred data or + load data automatically. However, this method can be necessary when + working with many file objects on disk. + """ + new = self.copy(deep=False) + return new.load() + @classmethod def _construct_direct(cls, variables, coord_names, dims=None, attrs=None, file_obj=None): @@ -424,14 +440,12 @@ def copy(self, deep=False): """Returns a copy of this dataset. If `deep=True`, a deep copy is made of each of the component variables. - Otherwise, a shallow copy is made, so each variable in the new dataset - is also a variable in the original dataset. + Otherwise, a shallow copy of each of the component variable is made, so + that the underlying memory region of the new dataset is the same as in + the original dataset. """ - if deep: - variables = OrderedDict((k, v.copy(deep=True)) - for k, v in iteritems(self._variables)) - else: - variables = self._variables.copy() + variables = OrderedDict((k, v.copy(deep=deep)) + for k, v in iteritems(self._variables)) # skip __init__ to avoid costly validation return self._construct_direct(variables, self._coord_names.copy(), self._dims.copy(), self._attrs_copy()) @@ -817,11 +831,10 @@ def chunks(self): chunks = {} for v in self.variables.values(): if v.chunks is not None: - new_chunks = list(zip(v.dims, v.chunks)) - if any(chunk != chunks[d] for d, chunk in new_chunks - if d in chunks): - raise ValueError('inconsistent chunks') - chunks.update(new_chunks) + for dim, c in zip(v.dims, v.chunks): + if dim in chunks and c != chunks[dim]: + raise ValueError('inconsistent chunks') + chunks[dim] = c return Frozen(SortedKeysDict(chunks)) def chunk(self, chunks=None, name_prefix='xarray-', token=None, diff --git a/xarray/core/variable.py b/xarray/core/variable.py index 6aafbcaab82..1b6f5b55dda 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -277,10 +277,21 @@ def data(self, data): "replacement data must match the Variable's shape") self._data = data + def _data_cast(self): + if isinstance(self._data, (np.ndarray, PandasIndexAdapter)): + return self._data + else: + return np.asarray(self._data) + def _data_cached(self): - if not isinstance(self._data, (np.ndarray, PandasIndexAdapter)): - self._data = np.asarray(self._data) - return self._data + """Load data into memory and return it. + Do not cache dask arrays automatically; that should + require an explicit load() call. + """ + new_data = self._data_cast() + if not isinstance(self._data, dask_array_type): + self._data = new_data + return new_data @property def _indexable_data(self): @@ -294,12 +305,26 @@ def load(self): because all xarray functions should either work on deferred data or load data automatically. """ - self._data_cached() + self._data = self._data_cast() return self + def compute(self): + """Manually trigger loading of this variable's data from disk or a + remote source into memory and return a new variable. The original is + left unaltered. + + Normally, it should not be necessary to call this method in user code, + because all xarray functions should either work on deferred data or + load data automatically. + """ + new = self.copy(deep=False) + return new.load() + def __getstate__(self): - """Always cache data as an in-memory array before pickling""" - self._data_cached() + """Always cache data as an in-memory array before pickling + (with the exception of dask backend)""" + if not isinstance(self._data, dask_array_type): + self._data_cached() # self.__dict__ is the default pickle object, we don't need to # implement our own __setstate__ method to make pickle work return self.__dict__ @@ -1075,10 +1100,19 @@ def __init__(self, dims, data, attrs=None, encoding=None, fastpath=False): raise ValueError('%s objects must be 1-dimensional' % type(self).__name__) - def _data_cached(self): + # Unlike in Variable, always eagerly load values into memory if not isinstance(self._data, PandasIndexAdapter): self._data = PandasIndexAdapter(self._data) - return self._data + + @Variable.data.setter + def data(self, data): + Variable.data.fset(self, data) + if not isinstance(self._data, PandasIndexAdapter): + self._data = PandasIndexAdapter(self._data) + + def chunk(self, chunks=None, name=None, lock=False): + # Dummy - do not chunk. This method is invoked e.g. by Dataset.chunk() + return self.copy(deep=False) def __getitem__(self, key): key = self._item_key_to_tuple(key) diff --git a/xarray/test/test_backends.py b/xarray/test/test_backends.py index 95af76f58a0..65e0f3d51ac 100644 --- a/xarray/test/test_backends.py +++ b/xarray/test/test_backends.py @@ -128,8 +128,12 @@ def assert_loads(vars=None): if vars is None: vars = expected with self.roundtrip(expected) as actual: - for v in actual.variables.values(): - self.assertFalse(v._in_memory) + for k, v in actual.variables.items(): + # IndexVariables are eagerly loaded into memory + if k in actual.dims: + self.assertTrue(v._in_memory) + else: + self.assertFalse(v._in_memory) yield actual for k, v in actual.variables.items(): if k in vars: @@ -152,6 +156,31 @@ def assert_loads(vars=None): actual = ds.load() self.assertDatasetAllClose(expected, actual) + def test_dataset_compute(self): + expected = create_test_data() + + with self.roundtrip(expected) as actual: + # Test Dataset.compute() + for k, v in actual.variables.items(): + # IndexVariables are eagerly cached + if k in actual.dims: + self.assertTrue(v._in_memory) + else: + self.assertFalse(v._in_memory) + + computed = actual.compute() + + for k, v in actual.variables.items(): + if k in actual.dims: + self.assertTrue(v._in_memory) + else: + self.assertFalse(v._in_memory) + for v in computed.variables.values(): + self.assertTrue(v._in_memory) + + self.assertDatasetAllClose(expected, actual) + self.assertDatasetAllClose(expected, computed) + def test_roundtrip_None_variable(self): expected = Dataset({None: (('x', 'y'), [[0, 1], [2, 3]])}) with self.roundtrip(expected) as actual: @@ -233,18 +262,6 @@ def test_roundtrip_coordinates(self): with self.roundtrip(expected) as actual: self.assertDatasetIdentical(expected, actual) - expected = original.copy() - expected.attrs['coordinates'] = 'something random' - with self.assertRaisesRegexp(ValueError, 'cannot serialize'): - with self.roundtrip(expected): - pass - - expected = original.copy(deep=True) - expected['foo'].attrs['coordinates'] = 'something random' - with self.assertRaisesRegexp(ValueError, 'cannot serialize'): - with self.roundtrip(expected): - pass - def test_roundtrip_boolean_dtype(self): original = create_boolean_data() self.assertEqual(original['x'].dtype, 'bool') @@ -875,7 +892,26 @@ def test_read_byte_attrs_as_unicode(self): @requires_dask @requires_scipy @requires_netCDF4 -class DaskTest(TestCase): +class DaskTest(TestCase, DatasetIOTestCases): + @contextlib.contextmanager + def create_store(self): + yield Dataset() + + @contextlib.contextmanager + def roundtrip(self, data, save_kwargs={}, open_kwargs={}): + yield data.chunk() + + def test_roundtrip_datetime_data(self): + # Override method in DatasetIOTestCases - remove not applicable save_kwds + times = pd.to_datetime(['2000-01-01', '2000-01-02', 'NaT']) + expected = Dataset({'t': ('t', times), 't0': times[0]}) + with self.roundtrip(expected) as actual: + self.assertDatasetIdentical(expected, actual) + + def test_write_store(self): + # Override method in DatasetIOTestCases - not applicable to dask + pass + def test_open_mfdataset(self): original = Dataset({'foo': ('x', np.random.randn(10))}) with create_tmp_file() as tmp1: @@ -995,6 +1031,15 @@ def test_deterministic_names(self): self.assertIn(tmp, dask_name) self.assertEqual(original_names, repeat_names) + def test_dataarray_compute(self): + # Test DataArray.compute() on dask backend. + # The test for Dataset.compute() is already in DatasetIOTestCases; + # however dask is the only tested backend which supports DataArrays + actual = DataArray([1,2]).chunk() + computed = actual.compute() + self.assertFalse(actual._in_memory) + self.assertTrue(computed._in_memory) + self.assertDataArrayAllClose(actual, computed) @requires_scipy_or_netCDF4 @requires_pydap diff --git a/xarray/test/test_dask.py b/xarray/test/test_dask.py index 305332f8a68..69a434a4f72 100644 --- a/xarray/test/test_dask.py +++ b/xarray/test/test_dask.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import pickle import numpy as np import pandas as pd @@ -17,44 +18,31 @@ import dask.array as da -def _copy_at_variable_level(arg): - """We need to copy the argument at the level of xarray.Variable objects, so - that viewing its values does not trigger lazy loading. - """ - if isinstance(arg, Variable): - return arg.copy(deep=False) - elif isinstance(arg, DataArray): - ds = arg.to_dataset(name='__copied__') - return _copy_at_variable_level(ds)['__copied__'] - elif isinstance(arg, Dataset): - ds = arg.copy() - for k in list(ds): - ds._variables[k] = ds._variables[k].copy(deep=False) - return ds - else: - assert False - - class DaskTestCase(TestCase): def assertLazyAnd(self, expected, actual, test): - expected_copy = _copy_at_variable_level(expected) - actual_copy = _copy_at_variable_level(actual) with dask.set_options(get=dask.get): - test(actual_copy, expected_copy) - var = getattr(actual, 'variable', actual) - self.assertIsInstance(var.data, da.Array) + test(actual, expected) + if isinstance(actual, Dataset): + for k, v in actual.variables.items(): + if k in actual.dims: + self.assertIsInstance(var.data, np.ndarray) + else: + self.assertIsInstance(var.data, da.Array) + elif isinstance(actual, DataArray): + self.assertIsInstance(actual.data, da.Array) + for k, v in actual.coords.items(): + if k in actual.dims: + self.assertIsInstance(v.data, np.ndarray) + else: + self.assertIsInstance(v.data, da.Array) + elif isinstance(actual, Variable): + self.assertIsInstance(actual.data, da.Array) + else: + assert False @requires_dask class TestVariable(DaskTestCase): - def assertLazyAnd(self, expected, actual, test): - expected_copy = expected.copy(deep=False) - actual_copy = actual.copy(deep=False) - with dask.set_options(get=dask.get): - test(actual_copy, expected_copy) - var = getattr(actual, 'variable', actual) - self.assertIsInstance(var.data, da.Array) - def assertLazyAndIdentical(self, expected, actual): self.assertLazyAnd(expected, actual, self.assertVariableIdentical) @@ -203,6 +191,9 @@ def assertLazyAndIdentical(self, expected, actual): def assertLazyAndAllClose(self, expected, actual): self.assertLazyAnd(expected, actual, self.assertDataArrayAllClose) + def assertLazyAndEqual(self, expected, actual): + self.assertLazyAnd(expected, actual, self.assertDataArrayEqual) + def setUp(self): self.values = np.random.randn(4, 6) self.data = da.from_array(self.values, chunks=(2, 2)) @@ -212,6 +203,7 @@ def setUp(self): def test_rechunk(self): chunked = self.eager_array.chunk({'x': 2}).chunk({'y': 2}) self.assertEqual(chunked.chunks, ((2,) * 2, (2,) * 3)) + self.assertLazyAndIdentical(self.lazy_array, chunked) def test_new_chunk(self): chunked = self.eager_array.chunk() @@ -270,7 +262,7 @@ def test_to_dataset_roundtrip(self): v = self.lazy_array expected = u.assign_coords(x=u['x']) - self.assertLazyAndIdentical(expected, v.to_dataset('x').to_array('x')) + self.assertLazyAndEqual(expected, v.to_dataset('x').to_array('x')) def test_merge(self): @@ -279,7 +271,7 @@ def duplicate_and_merge(array): expected = duplicate_and_merge(self.eager_array) actual = duplicate_and_merge(self.lazy_array) - self.assertLazyAndIdentical(expected, actual) + self.assertLazyAndEqual(expected, actual) def test_ufuncs(self): u = self.eager_array @@ -292,9 +284,9 @@ def test_where_dispatching(self): x = da.from_array(a, 5) y = da.from_array(b, 5) expected = DataArray(a).where(b) - self.assertLazyAndIdentical(expected, DataArray(a).where(y)) - self.assertLazyAndIdentical(expected, DataArray(x).where(b)) - self.assertLazyAndIdentical(expected, DataArray(x).where(y)) + self.assertLazyAndEqual(expected, DataArray(a).where(y)) + self.assertLazyAndEqual(expected, DataArray(x).where(b)) + self.assertLazyAndEqual(expected, DataArray(x).where(y)) def test_simultaneous_compute(self): ds = Dataset({'foo': ('x', range(5)), @@ -319,16 +311,77 @@ def test_stack(self): expected = DataArray(data.reshape(2, -1), {'w': [0, 1], 'z': z}, dims=['w', 'z']) assert stacked.data.chunks == expected.data.chunks - self.assertLazyAndIdentical(expected, stacked) + self.assertLazyAndEqual(expected, stacked) def test_dot(self): eager = self.eager_array.dot(self.eager_array[0]) lazy = self.lazy_array.dot(self.lazy_array[0]) self.assertLazyAndAllClose(eager, lazy) + def test_variable_pickle(self): + # Test that pickling/unpickling does not convert the dask + # backend to numpy + a1 = Variable(['x'], build_dask_array()) + a1.compute() + self.assertFalse(a1._in_memory) + self.assertEquals(kernel_call_count, 1) + a2 = pickle.loads(pickle.dumps(a1)) + self.assertEquals(kernel_call_count, 1) + self.assertVariableIdentical(a1, a2) + self.assertFalse(a1._in_memory) + self.assertFalse(a2._in_memory) + + def test_dataarray_pickle(self): + # Test that pickling/unpickling does not convert the dask + # backend to numpy + a1 = DataArray(build_dask_array()) + a1.compute() + self.assertFalse(a1._in_memory) + self.assertEquals(kernel_call_count, 1) + a2 = pickle.loads(pickle.dumps(a1)) + self.assertEquals(kernel_call_count, 1) + self.assertDataArrayIdentical(a1, a2) + self.assertFalse(a1._in_memory) + self.assertFalse(a2._in_memory) + + def test_dataset_pickle(self): + ds1 = Dataset({'a': DataArray(build_dask_array())}) + ds1.compute() + self.assertFalse(ds1['a']._in_memory) + self.assertEquals(kernel_call_count, 1) + ds2 = pickle.loads(pickle.dumps(ds1)) + self.assertEquals(kernel_call_count, 1) + self.assertDatasetIdentical(ds1, ds2) + self.assertFalse(ds1['a']._in_memory) + self.assertFalse(ds2['a']._in_memory) + + def test_values(self): + # Test that invoking the values property does not convert the dask + # backend to numpy + a = DataArray([1,2]).chunk() + self.assertFalse(a._in_memory) + self.assertEquals(a.values.tolist(), [1, 2]) + self.assertFalse(a._in_memory) + def test_from_dask_variable(self): # Test array creation from Variable with dask backend. # This is used e.g. in broadcast() a = DataArray(self.lazy_array.variable) - self.assertLazyAndIdentical(self.lazy_array, a) + self.assertLazyAndEqual(self.lazy_array, a) + +kernel_call_count = 0 +def kernel(): + """Dask kernel to test pickling/unpickling. + Must be global to make it pickleable. + """ + global kernel_call_count + kernel_call_count += 1 + return np.ones(1) + +def build_dask_array(): + global kernel_call_count + kernel_call_count = 0 + return dask.array.Array( + dask={('foo', 0): (kernel, )}, name='foo', + chunks=((1,),), dtype=int) diff --git a/xarray/test/test_dataset.py b/xarray/test/test_dataset.py index 5b8af6b6437..67af48e897c 100644 --- a/xarray/test/test_dataset.py +++ b/xarray/test/test_dataset.py @@ -56,11 +56,24 @@ def create_test_multiindex(): class InaccessibleVariableDataStore(backends.InMemoryDataStore): + def __init__(self, writer=None): + super(InaccessibleVariableDataStore, self).__init__(writer) + self._indexvars = set() + + def store(self, variables, attributes, check_encoding_set=frozenset()): + super(InaccessibleVariableDataStore, self).store( + variables, attributes, check_encoding_set) + for k, v in variables.items(): + if isinstance(v, IndexVariable): + self._indexvars.add(k) + def get_variables(self): - def lazy_inaccessible(x): - data = indexing.LazilyIndexedArray(InaccessibleArray(x.values)) - return Variable(x.dims, data, x.attrs) - return dict((k, lazy_inaccessible(v)) for + def lazy_inaccessible(k, v): + if k in self._indexvars: + return v + data = indexing.LazilyIndexedArray(InaccessibleArray(v.values)) + return Variable(v.dims, data, v.attrs) + return dict((k, lazy_inaccessible(k, v)) for k, v in iteritems(self._variables)) @@ -672,14 +685,19 @@ def test_chunk(self): self.assertEqual(data.chunks, {}) reblocked = data.chunk() - for v in reblocked.variables.values(): - self.assertIsInstance(v.data, da.Array) - expected_chunks = dict((d, (s,)) for d, s in data.dims.items()) + for k, v in reblocked.variables.items(): + if k in reblocked.dims: + self.assertIsInstance(v.data, np.ndarray) + else: + self.assertIsInstance(v.data, da.Array) + + expected_chunks = {'dim1': (8,), 'dim2': (9,), 'dim3': (10,)} self.assertEqual(reblocked.chunks, expected_chunks) reblocked = data.chunk({'time': 5, 'dim1': 5, 'dim2': 5, 'dim3': 5}) - expected_chunks = {'time': (5,) * 4, 'dim1': (5, 3), - 'dim2': (5, 4), 'dim3': (5, 5)} + # time is not a dim in any of the data_vars, so it + # doesn't get chunked + expected_chunks = {'dim1': (5, 3), 'dim2': (5, 4), 'dim3': (5, 5)} self.assertEqual(reblocked.chunks, expected_chunks) reblocked = data.chunk(expected_chunks) @@ -1292,10 +1310,13 @@ def test_copy(self): for copied in [data.copy(deep=False), copy(data)]: self.assertDatasetIdentical(data, copied) - for k in data: + # Note: IndexVariable objects with string dtype are always + # copied because of xarray.core.util.safe_cast_to_index. + # Limiting the test to data variables. + for k in data.data_vars: v0 = data.variables[k] v1 = copied.variables[k] - self.assertIs(v0, v1) + assert source_ndarray(v0.data) is source_ndarray(v1.data) copied['foo'] = ('z', np.arange(5)) self.assertNotIn('foo', data) diff --git a/xarray/test/test_variable.py b/xarray/test/test_variable.py index 8693ed39f65..e79d30eeb6e 100644 --- a/xarray/test/test_variable.py +++ b/xarray/test/test_variable.py @@ -1052,13 +1052,11 @@ def test_multiindex_default_level_names(self): def test_data(self): x = IndexVariable('x', np.arange(3.0)) - # data should be initially saved as an ndarray - self.assertIs(type(x._data), np.ndarray) + self.assertIsInstance(x._data, PandasIndexAdapter) + self.assertIsInstance(x.data, np.ndarray) self.assertEqual(float, x.dtype) self.assertArrayEqual(np.arange(3), x) self.assertEqual(float, x.values.dtype) - # after inspecting x.values, the IndexVariable value will be saved as an Index - self.assertIsInstance(x._data, PandasIndexAdapter) with self.assertRaisesRegexp(TypeError, 'cannot be modified'): x[:] = 0