Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xarray.backends refactor #2261

Merged
merged 45 commits into from
Oct 9, 2018
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4faaf3a
WIP: xarray.backends.file_manager for managing file objects.
shoyer Jul 1, 2018
c82a38c
Switch rasterio to use FileManager
shoyer Jul 1, 2018
7a55a30
lint fixes
shoyer Jul 4, 2018
51463dd
WIP: rewrite FileManager to always use an LRUCache
shoyer Jul 9, 2018
23e132f
Test coverage
shoyer Jul 10, 2018
8fc8183
Don't use move_to_end
shoyer Jul 10, 2018
422944b
minor clarification
shoyer Jul 10, 2018
aea0a1a
Switch FileManager.acquire() to a method
shoyer Jul 11, 2018
4366c0b
Python 2 compat
shoyer Jul 11, 2018
f35b7e7
Update xarray.set_options() to add file_cache_maxsize and validation
shoyer Jul 11, 2018
057cad2
Add assert for FILE_CACHE.maxsize
shoyer Jul 11, 2018
0f3e656
More docstring for FileManager
shoyer Jul 11, 2018
1a0cc10
Add accidentally omited tests for LRUCache
shoyer Jul 11, 2018
8784e6b
Merge branch 'master' into file-manager
shoyer Jul 28, 2018
83d9b10
Adapt scipy backend to use FileManager
shoyer Jul 28, 2018
a0074ff
Stickler fix
shoyer Jul 28, 2018
062ba96
Fix failure on Python 2.7
shoyer Jul 29, 2018
2d41b29
Finish adjusting backends to use FileManager
shoyer Jul 29, 2018
2adf486
Fix bad import
shoyer Jul 30, 2018
76f151c
WIP on distributed
shoyer Aug 1, 2018
769f079
More WIP
shoyer Aug 6, 2018
3e97264
Merge branch 'master' into file-manager
shoyer Aug 17, 2018
5e67efe
Fix distributed write tests
shoyer Aug 19, 2018
8dc77c4
Merge branch 'master' into file-manager
shoyer Aug 19, 2018
1d38335
Fixes
shoyer Aug 19, 2018
6350ca6
Minor fixup
shoyer Aug 20, 2018
4aa0df7
whats new
shoyer Aug 30, 2018
67377c7
More refactoring: remove state from backends entirely
shoyer Aug 31, 2018
8c00f44
Merge branch 'master' into file-manager
shoyer Sep 6, 2018
2a5d1f0
Cleanup
shoyer Sep 6, 2018
a6c170b
Fix failing in-memory datastore tests
shoyer Sep 6, 2018
009e30d
Fix inaccessible datastore
shoyer Sep 6, 2018
14118ea
fix autoclose warnings
shoyer Sep 6, 2018
c778488
Fix PyNIO failures
shoyer Sep 6, 2018
fe14ebf
No longer disable HDF5 file locking
shoyer Sep 7, 2018
f1026ce
whats new and default file cache size
shoyer Sep 7, 2018
e13406b
Whats new tweak
shoyer Sep 7, 2018
465dfae
Refactor default lock logic to backend classes
shoyer Sep 10, 2018
55d35c8
Rename get_resource_lock -> get_write_lock
shoyer Sep 10, 2018
c8fbadc
Don't acquire unnecessary locks in __getitem__
shoyer Sep 10, 2018
ede8ef0
Merge branch 'master' into file-manager
shoyer Sep 26, 2018
220c302
Merge branch 'master' into file-manager
shoyer Oct 8, 2018
36f1156
Fix bad merge
shoyer Oct 9, 2018
c6f43dd
Fix import
shoyer Oct 9, 2018
8916bc7
Remove unreachable code
shoyer Oct 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,23 @@ What's New
- `Python 3 Statement <http://www.python3statement.org/>`__
- `Tips on porting to Python 3 <https://docs.python.org/3/howto/pyporting.html>`__

.. _whats-new.0.10.9:
.. _whats-new.0.11.0:

v0.10.9 (unreleased)
v0.11.0 (unreleased)
--------------------

Breaking changes
~~~~~~~~~~~~~~~~

- Xarray's storage backends now automatically open and close files when
necessary, rather than requiring opening a file with ``autoclose=True``. A
global least-recently-used cache is used to store open files; the default
limit of 512 open files should suffice in most cases, but can be adjusted if
necessary with
``xarray.set_options(file_cache_maxsize=...)``.

TODO: Add some note about performance benefits.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a reminder to synthesize your most recent post on this PR.


Documentation
~~~~~~~~~~~~~

Expand Down
4 changes: 4 additions & 0 deletions xarray/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
formats. They should not be used directly, but rather through Dataset objects.
"""
from .common import AbstractDataStore
from .file_manager import FileManager, CachingFileManager, DummyFileManager
from .memory import InMemoryDataStore
from .netCDF4_ import NetCDF4DataStore
from .pydap_ import PydapDataStore
Expand All @@ -15,6 +16,9 @@

__all__ = [
'AbstractDataStore',
'FileManager',
'CachingFileManager',
'DummyFileManager',
'InMemoryDataStore',
'NetCDF4DataStore',
'PydapDataStore',
Expand Down
120 changes: 55 additions & 65 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from glob import glob
from io import BytesIO
from numbers import Number
import warnings

import numpy as np

Expand All @@ -13,7 +14,7 @@
from ..core.pycompat import basestring, path_type
from ..core.utils import close_on_error, is_remote_uri
from .common import (
HDF5_LOCK, ArrayWriter, CombinedLock, _get_scheduler, _get_scheduler_lock)
HDF5_LOCK, ArrayWriter, combine_locks, _get_scheduler, _get_scheduler_lock)

DATAARRAY_NAME = '__xarray_dataarray_name__'
DATAARRAY_VARIABLE = '__xarray_dataarray_variable__'
Expand Down Expand Up @@ -52,27 +53,42 @@ def _normalize_path(path):
return os.path.abspath(os.path.expanduser(path))


def _default_lock(filename, engine):
def _default_read_lock(filename, engine):
if filename.endswith('.gz'):
lock = False
lock = None
else:
if engine is None:
engine = _get_default_engine(filename, allow_remote=True)

if engine == 'netcdf4':
if is_remote_uri(filename):
lock = False
lock = None
else:
# TODO: identify netcdf3 files and don't use the global lock
# for them
lock = HDF5_LOCK
elif engine in {'h5netcdf', 'pynio'}:
lock = HDF5_LOCK
else:
lock = False
lock = None
return lock


def _get_write_lock(engine, scheduler, format, path_or_file):
""" Get the lock(s) that apply to a particular scheduler/engine/format"""

locks = []

if (engine == 'h5netcdf' or engine == 'netcdf4' and
(format is None or format.startswith('NETCDF4'))):
locks.append(HDF5_LOCK)

locks.append(_get_scheduler_lock(scheduler, path_or_file))

return combine_locks(locks)



def _validate_dataset_names(dataset):
"""DataArray.name and Dataset keys must be a string or None"""
def check_name(name):
Expand Down Expand Up @@ -130,20 +146,6 @@ def _protect_dataset_variables_inplace(dataset, cache):
variable.data = data


def _get_lock(engine, scheduler, format, path_or_file):
""" Get the lock(s) that apply to a particular scheduler/engine/format"""

locks = []
if format in ['NETCDF4', None] and engine in ['h5netcdf', 'netcdf4']:
locks.append(HDF5_LOCK)
locks.append(_get_scheduler_lock(scheduler, path_or_file))

# When we have more than one lock, use the CombinedLock wrapper class
lock = CombinedLock(locks) if len(locks) > 1 else locks[0]

return lock


def _finalize_store(write, store):
""" Finalize this store by explicitly syncing and closing"""
del write # ensure writing is done first
Expand All @@ -152,7 +154,7 @@ def _finalize_store(write, store):


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=None, decode_times=True, autoclose=False,
mask_and_scale=None, decode_times=True, autoclose=None,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, cache=None, drop_variables=None,
backend_kwargs=None):
Expand Down Expand Up @@ -235,6 +237,14 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
--------
open_mfdataset
"""
if autoclose is not None:
warnings.warn(
'The autoclose argument is no longer used by '
'xarray.open_dataset() and is now ignored; it will be removed in '
'xarray v0.12. If necessary, you can control the maximum number '
'of simultaneous open files with '
'xarray.set_options(file_cache_maxsize=...).',
FutureWarning, stacklevel=2)

if mask_and_scale is None:
mask_and_scale = not engine == 'pseudonetcdf'
Expand Down Expand Up @@ -272,18 +282,11 @@ def maybe_decode_store(store, lock=False):
mask_and_scale, decode_times, concat_characters,
decode_coords, engine, chunks, drop_variables)
name_prefix = 'open_dataset-%s' % token
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token,
lock=lock)
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token)
ds2._file_obj = ds._file_obj
else:
ds2 = ds

# protect so that dataset store isn't necessarily closed, e.g.,
# streams like BytesIO can't be reopened
# datastore backend is responsible for determining this capability
if store._autoclose:
store.close()

return ds2

if isinstance(filename_or_obj, path_type):
Expand All @@ -310,40 +313,34 @@ def maybe_decode_store(store, lock=False):
else:
engine = 'scipy'

if lock is None:
lock = _default_read_lock(filename_or_obj, engine)

if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore.open(filename_or_obj,
group=group,
autoclose=autoclose,
**backend_kwargs)
store = backends.NetCDF4DataStore.open(
filename_or_obj, group=group, lock=lock, **backend_kwargs)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj,
autoclose=autoclose,
**backend_kwargs)
store = backends.ScipyDataStore(filename_or_obj, **backend_kwargs)
elif engine == 'pydap':
store = backends.PydapDataStore.open(filename_or_obj,
**backend_kwargs)
store = backends.PydapDataStore.open(
filename_or_obj, **backend_kwargs)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group,
autoclose=autoclose,
**backend_kwargs)
store = backends.H5NetCDFStore(
filename_or_obj, group=group, lock=lock, **backend_kwargs)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj,
autoclose=autoclose,
**backend_kwargs)
store = backends.NioDataStore(filename_or_obj, **backend_kwargs)
elif engine == 'pseudonetcdf':
store = backends.PseudoNetCDFDataStore.open(
filename_or_obj, autoclose=autoclose, **backend_kwargs)
filename_or_obj, **backend_kwargs)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)

if lock is None:
lock = _default_lock(filename_or_obj, engine)
with close_on_error(store):
return maybe_decode_store(store, lock)
return maybe_decode_store(store)
else:
if engine is not None and engine != 'scipy':
raise ValueError('can only read file-like objects with '
Expand All @@ -355,7 +352,7 @@ def maybe_decode_store(store, lock=False):


def open_dataarray(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=None, decode_times=True, autoclose=False,
mask_and_scale=None, decode_times=True, autoclose=None,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, cache=None, drop_variables=None,
backend_kwargs=None):
Expand Down Expand Up @@ -390,10 +387,6 @@ def open_dataarray(filename_or_obj, group=None, decode_cf=True,
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
Expand Down Expand Up @@ -490,7 +483,7 @@ def close(self):
def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
compat='no_conflicts', preprocess=None, engine=None,
lock=None, data_vars='all', coords='different',
autoclose=False, parallel=False, **kwargs):
autoclose=None, parallel=False, **kwargs):
"""Open multiple files as a single dataset.

Requires dask to be installed. See documentation for details on dask [1].
Expand Down Expand Up @@ -537,10 +530,6 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
lock : False, True or threading.Lock, optional
This argument is passed on to :py:func:`dask.array.from_array`. By
default, a per-variable lock is used when reading data from netCDF
Expand Down Expand Up @@ -605,7 +594,7 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
raise IOError('no files to open')

if lock is None:
lock = _default_lock(paths[0], engine)
lock = _default_read_lock(paths[0], engine)

open_kwargs = dict(engine=engine, chunks=chunks or {}, lock=lock,
autoclose=autoclose, **kwargs)
Expand Down Expand Up @@ -701,18 +690,18 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
# handle scheduler specific logic
scheduler = _get_scheduler()
have_chunks = any(v.chunks for v in dataset.variables.values())
if (have_chunks and scheduler in ['distributed', 'multiprocessing'] and
engine != 'netcdf4'):

autoclose = have_chunks and scheduler in ['distributed', 'multiprocessing']
if autoclose and engine == 'scipy':
raise NotImplementedError("Writing netCDF files with the %s backend "
"is not currently supported with dask's %s "
"scheduler" % (engine, scheduler))
lock = _get_lock(engine, scheduler, format, path_or_file)
autoclose = (have_chunks and
scheduler in ['distributed', 'multiprocessing'])
lock = _get_write_lock(engine, scheduler, format, path_or_file)

target = path_or_file if path_or_file is not None else BytesIO()
store = store_open(target, mode, format, group, writer,
autoclose=autoclose, lock=lock)
kwargs = dict(autoclose=True) if autoclose else {}
store = store_open(
target, mode, format, group, writer, lock=lock, **kwargs)

if unlimited_dims is None:
unlimited_dims = dataset.encoding.get('unlimited_dims', None)
Expand All @@ -735,6 +724,7 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
if not sync:
return store


def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
engine=None, compute=True):
"""Write multiple datasets to disk as netCDF files simultaneously.
Expand Down
Loading