Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xarray.backends refactor #2261

Merged
merged 45 commits into from
Oct 9, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4faaf3a
WIP: xarray.backends.file_manager for managing file objects.
shoyer Jul 1, 2018
c82a38c
Switch rasterio to use FileManager
shoyer Jul 1, 2018
7a55a30
lint fixes
shoyer Jul 4, 2018
51463dd
WIP: rewrite FileManager to always use an LRUCache
shoyer Jul 9, 2018
23e132f
Test coverage
shoyer Jul 10, 2018
8fc8183
Don't use move_to_end
shoyer Jul 10, 2018
422944b
minor clarification
shoyer Jul 10, 2018
aea0a1a
Switch FileManager.acquire() to a method
shoyer Jul 11, 2018
4366c0b
Python 2 compat
shoyer Jul 11, 2018
f35b7e7
Update xarray.set_options() to add file_cache_maxsize and validation
shoyer Jul 11, 2018
057cad2
Add assert for FILE_CACHE.maxsize
shoyer Jul 11, 2018
0f3e656
More docstring for FileManager
shoyer Jul 11, 2018
1a0cc10
Add accidentally omited tests for LRUCache
shoyer Jul 11, 2018
8784e6b
Merge branch 'master' into file-manager
shoyer Jul 28, 2018
83d9b10
Adapt scipy backend to use FileManager
shoyer Jul 28, 2018
a0074ff
Stickler fix
shoyer Jul 28, 2018
062ba96
Fix failure on Python 2.7
shoyer Jul 29, 2018
2d41b29
Finish adjusting backends to use FileManager
shoyer Jul 29, 2018
2adf486
Fix bad import
shoyer Jul 30, 2018
76f151c
WIP on distributed
shoyer Aug 1, 2018
769f079
More WIP
shoyer Aug 6, 2018
3e97264
Merge branch 'master' into file-manager
shoyer Aug 17, 2018
5e67efe
Fix distributed write tests
shoyer Aug 19, 2018
8dc77c4
Merge branch 'master' into file-manager
shoyer Aug 19, 2018
1d38335
Fixes
shoyer Aug 19, 2018
6350ca6
Minor fixup
shoyer Aug 20, 2018
4aa0df7
whats new
shoyer Aug 30, 2018
67377c7
More refactoring: remove state from backends entirely
shoyer Aug 31, 2018
8c00f44
Merge branch 'master' into file-manager
shoyer Sep 6, 2018
2a5d1f0
Cleanup
shoyer Sep 6, 2018
a6c170b
Fix failing in-memory datastore tests
shoyer Sep 6, 2018
009e30d
Fix inaccessible datastore
shoyer Sep 6, 2018
14118ea
fix autoclose warnings
shoyer Sep 6, 2018
c778488
Fix PyNIO failures
shoyer Sep 6, 2018
fe14ebf
No longer disable HDF5 file locking
shoyer Sep 7, 2018
f1026ce
whats new and default file cache size
shoyer Sep 7, 2018
e13406b
Whats new tweak
shoyer Sep 7, 2018
465dfae
Refactor default lock logic to backend classes
shoyer Sep 10, 2018
55d35c8
Rename get_resource_lock -> get_write_lock
shoyer Sep 10, 2018
c8fbadc
Don't acquire unnecessary locks in __getitem__
shoyer Sep 10, 2018
ede8ef0
Merge branch 'master' into file-manager
shoyer Sep 26, 2018
220c302
Merge branch 'master' into file-manager
shoyer Oct 8, 2018
36f1156
Fix bad merge
shoyer Oct 9, 2018
c6f43dd
Fix import
shoyer Oct 9, 2018
8916bc7
Remove unreachable code
shoyer Oct 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 2 additions & 29 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __array__(self, dtype=None):
class AbstractDataStore(Mapping):
_autoclose = None
_ds = None
_isopen = False
_isopen = None

def __iter__(self):
return iter(self.variables)
Expand Down Expand Up @@ -330,7 +330,7 @@ def set_variable(self, k, v): # pragma: no cover
raise NotImplementedError

def sync(self, compute=True):
if self._isopen and self._autoclose:
if self._isopen is not None and self._isopen and self._autoclose:
# datastore will be reopened during write
self.close()
self.delayed_store = self.writer.sync(compute=compute)
Expand Down Expand Up @@ -514,30 +514,3 @@ def assert_open(self):
if not self._isopen:
raise AssertionError('internal failure: file must be open '
'if `autoclose=True` is used.')


class PickleByReconstructionWrapper(object):

def __init__(self, opener, file, mode='r', **kwargs):
self.opener = partial(opener, file, mode=mode, **kwargs)
self.mode = mode
self._ds = None

@property
def value(self):
self._ds = self.opener()
return self._ds

def __getstate__(self):
state = self.__dict__.copy()
del state['_ds']
if self.mode == 'w':
# file has already been created, don't override when restoring
state['mode'] = 'a'
return state

def __setstate__(self, state):
self.__dict__.update(state)

def close(self):
self._ds.close()
162 changes: 162 additions & 0 deletions xarray/backends/file_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import threading

from ..core import utils
from ..core.options import OPTIONS
from .lru_cache import LRUCache


# Global cache for storing open files.
FILE_CACHE = LRUCache(
OPTIONS['file_cache_maxsize'], on_evict=lambda k, v: v.close())
assert FILE_CACHE.maxsize, 'file cache must be at least size one'


_DEFAULT_MODE = utils.ReprObject('<unused>')


class FileManager(object):
"""Wrapper for automatically opening and closing file objects.

Unlike files, FileManager objects can be safely pickled and passed between
processes. They should be explicitly closed to release resources, but
a per-process least-recently-used cache for open files ensures that you can
safely create arbitrarily large numbers of FileManager objects.

Don't directly close files acquired from a FileManager. Instead, call
FileManager.close(), which ensures that closed files are removed from the
cache as well.

Example usage:

manager = FileManager(open, 'example.txt', mode='w')
f = manager.acquire()
f.write(...)
manager.close() # ensures file is closed

Note that as long as previous files are still cached, acquiring a file
multiple times from the same FileManager is essentially free:

f1 = manager.acquire()
f2 = manager.acquire()
assert f1 is f2

"""

def __init__(self, opener, *args, **keywords):
"""Initialize a FileManager.

Parameters
----------
opener : callable
Function that when called like ``opener(*args, **kwargs)`` returns
an open file object. The file object must implement a ``close()``
method.
*args
Positional arguments for opener. A ``mode`` argument should be
provided as a keyword argument (see below). All arguments must be
hashable.
mode : optional
If provided, passed as a keyword argument to ``opener`` along with
``**kwargs``. ``mode='w' `` has special treatment: after the first
call it is replaced by ``mode='a'`` in all subsequent function to
avoid overriding the newly created file.
kwargs : dict, optional
Keyword arguments for opener, excluding ``mode``. All values must
be hashable.
lock : duck-compatible threading.Lock, optional
Lock to use when modifying the cache inside acquire() and close().
By default, uses a new threading.Lock() object. If set, this object
should be pickleable.
cache : MutableMapping, optional
Mapping to use as a cache for open files. By default, uses xarray's
global LRU file cache. Because ``cache`` typically points to a
global variable and contains non-picklable file objects, an
unpickled FileManager objects will be restored with the default
cache.
"""
# TODO: replace with real keyword arguments when we drop Python 2
# support
mode = keywords.pop('mode', _DEFAULT_MODE)
kwargs = keywords.pop('kwargs', None)
lock = keywords.pop('lock', None)
cache = keywords.pop('cache', FILE_CACHE)
self._opener = opener
self._args = args
self._mode = mode
self._kwargs = {} if kwargs is None else dict(kwargs)
self._default_lock = lock is None
self._lock = threading.Lock() if self._default_lock else lock
self._cache = cache
self._key = self._make_key()

def _make_key(self):
"""Make a key for caching files in the LRU cache."""
value = (self._opener,
self._args,
self._mode,
tuple(sorted(self._kwargs.items())))
return _HashedSequence(value)

def acquire(self):
"""Acquiring a file object from the manager.

A new file is only opened if it has expired from the
least-recently-used cache.

This method uses a reentrant lock, which ensures that it is
thread-safe. You can safely acquire a file in multiple threads at the
same time, as long as the underlying file object is thread-safe.

Returns
-------
An open file object, as returned by ``opener(*args, **kwargs)``.
"""
with self._lock:
try:
file = self._cache[self._key]
except KeyError:
kwargs = self._kwargs
if self._mode is not _DEFAULT_MODE:
kwargs = kwargs.copy()
kwargs['mode'] = self._mode
file = self._opener(*self._args, **kwargs)
if self._mode == 'w':
# ensure file doesn't get overriden when opened again
self._mode = 'a'
self._key = self._make_key()
self._cache[self._key] = file
return file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to override the builtin file function here. Perhaps we can use fh or something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file is only a builtin on Python 2... are we still concerned about overriding it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know this and I'm happy to hear it. (I can't wait to be done with Python 2)


def close(self):
"""Explicitly close any associated file object (if necessary)."""
with self._lock:
file = self._cache.pop(self._key, default=None)
if file is not None:
file.close()

def __getstate__(self):
"""State for pickling."""
lock = None if self._default_lock else self._lock
return (self._opener, self._args, self._mode, self._kwargs, lock)

def __setstate__(self, state):
"""Restore from a pickle."""
opener, args, mode, kwargs, lock = state
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock)


class _HashedSequence(list):
"""Speedup repeated look-ups by caching hash values.

Based on what Python uses internally in functools.lru_cache.

Python doesn't perform this optimization automatically:
https://bugs.python.org/issue1462796
"""

def __init__(self, tuple_value):
self[:] = tuple_value
self.hashvalue = hash(tuple_value)

def __hash__(self):
return self.hashvalue
89 changes: 89 additions & 0 deletions xarray/backends/lru_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import collections
import threading


class LRUCache(collections.MutableMapping):
"""Thread-safe LRUCache based on an OrderedDict.

All dict operations (__getitem__, __setitem__, __contains__) update the
priority of the relevant key and take O(1) time. The dict is iterated over
in order from the oldest to newest key, which means that a complete pass
over the dict should not affect the order of any entries.

When a new item is set and the maximum size of the cache is exceeded, the
oldest item is dropped and called with ``on_evict(key, value)``.

The ``maxsize`` property can be used to view or adjust the capacity of
the cache, e.g., ``cache.maxsize = new_size``.
"""
def __init__(self, maxsize, on_evict=None):
"""
Parameters
----------
maxsize : int
Integer maximum number of items to hold in the cache.
on_evict: callable, optional
Function to call like ``on_evict(key, value)`` when items are
evicted.
"""
if maxsize < 0:
raise ValueError('maxsize must be non-negative')
self._maxsize = maxsize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enforce maxsize is an integer? I'm thinking that it may be easy to see None/False as valid values. I think that case is going to break things downstream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._on_evict = on_evict
self._cache = collections.OrderedDict()
self._lock = threading.RLock()

def __getitem__(self, key):
# record recent use of the key by moving it to the front of the list
with self._lock:
value = self._cache[key]
# On Python 3, could just use: self._cache.move_to_end(key)
del self._cache[key]
self._cache[key] = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on using the move_to_end here and catching the exception for python2 only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe a little helper function in pycompat that we can cleanup when python 2 is dropped.

def move_to_end(cache, key):
    try:
        cache.move_to_end(key)
    except AttributeError:
        del self._cache[key]
        self._cache[key] = value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, easy enough. Done.

return value

def _enforce_size_limit(self, capacity):
"""Shrink the cache if necessary, evicting the oldest items."""
while len(self._cache) > capacity:
key, value = self._cache.popitem(last=False)
if self._on_evict is not None:
self._on_evict(key, value)

def __setitem__(self, key, value):
with self._lock:
if key in self._cache:
# insert the new value at the end
del self._cache[key]
self._cache[key] = value
elif self._maxsize:
# make room if necessary
self._enforce_size_limit(self._maxsize - 1)
self._cache[key] = value
elif self._on_evict is not None:
# not saving, immediately evict
self._on_evict(key, value)

def __delitem__(self, key):
del self._cache[key]

def __iter__(self):
# create a list, so accessing the cache during iteration cannot change
# the iteration order
return iter(list(self._cache))

def __len__(self):
return len(self._cache)

@property
def maxsize(self):
"""Maximum number of items can be held in the cache."""
return self._maxsize

@maxsize.setter
def maxsize(self, size):
"""Resize the cache, evicting the oldest items if necessary."""
if size < 0:
raise ValueError('maxsize must be non-negative')
with self._lock:
self._enforce_size_limit(size)
self._maxsize = size
Loading