Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xarray.backends refactor #2261

Merged
merged 45 commits into from
Oct 9, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4faaf3a
WIP: xarray.backends.file_manager for managing file objects.
shoyer Jul 1, 2018
c82a38c
Switch rasterio to use FileManager
shoyer Jul 1, 2018
7a55a30
lint fixes
shoyer Jul 4, 2018
51463dd
WIP: rewrite FileManager to always use an LRUCache
shoyer Jul 9, 2018
23e132f
Test coverage
shoyer Jul 10, 2018
8fc8183
Don't use move_to_end
shoyer Jul 10, 2018
422944b
minor clarification
shoyer Jul 10, 2018
aea0a1a
Switch FileManager.acquire() to a method
shoyer Jul 11, 2018
4366c0b
Python 2 compat
shoyer Jul 11, 2018
f35b7e7
Update xarray.set_options() to add file_cache_maxsize and validation
shoyer Jul 11, 2018
057cad2
Add assert for FILE_CACHE.maxsize
shoyer Jul 11, 2018
0f3e656
More docstring for FileManager
shoyer Jul 11, 2018
1a0cc10
Add accidentally omited tests for LRUCache
shoyer Jul 11, 2018
8784e6b
Merge branch 'master' into file-manager
shoyer Jul 28, 2018
83d9b10
Adapt scipy backend to use FileManager
shoyer Jul 28, 2018
a0074ff
Stickler fix
shoyer Jul 28, 2018
062ba96
Fix failure on Python 2.7
shoyer Jul 29, 2018
2d41b29
Finish adjusting backends to use FileManager
shoyer Jul 29, 2018
2adf486
Fix bad import
shoyer Jul 30, 2018
76f151c
WIP on distributed
shoyer Aug 1, 2018
769f079
More WIP
shoyer Aug 6, 2018
3e97264
Merge branch 'master' into file-manager
shoyer Aug 17, 2018
5e67efe
Fix distributed write tests
shoyer Aug 19, 2018
8dc77c4
Merge branch 'master' into file-manager
shoyer Aug 19, 2018
1d38335
Fixes
shoyer Aug 19, 2018
6350ca6
Minor fixup
shoyer Aug 20, 2018
4aa0df7
whats new
shoyer Aug 30, 2018
67377c7
More refactoring: remove state from backends entirely
shoyer Aug 31, 2018
8c00f44
Merge branch 'master' into file-manager
shoyer Sep 6, 2018
2a5d1f0
Cleanup
shoyer Sep 6, 2018
a6c170b
Fix failing in-memory datastore tests
shoyer Sep 6, 2018
009e30d
Fix inaccessible datastore
shoyer Sep 6, 2018
14118ea
fix autoclose warnings
shoyer Sep 6, 2018
c778488
Fix PyNIO failures
shoyer Sep 6, 2018
fe14ebf
No longer disable HDF5 file locking
shoyer Sep 7, 2018
f1026ce
whats new and default file cache size
shoyer Sep 7, 2018
e13406b
Whats new tweak
shoyer Sep 7, 2018
465dfae
Refactor default lock logic to backend classes
shoyer Sep 10, 2018
55d35c8
Rename get_resource_lock -> get_write_lock
shoyer Sep 10, 2018
c8fbadc
Don't acquire unnecessary locks in __getitem__
shoyer Sep 10, 2018
ede8ef0
Merge branch 'master' into file-manager
shoyer Sep 26, 2018
220c302
Merge branch 'master' into file-manager
shoyer Oct 8, 2018
36f1156
Fix bad merge
shoyer Oct 9, 2018
c6f43dd
Fix import
shoyer Oct 9, 2018
8916bc7
Remove unreachable code
shoyer Oct 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,30 +508,3 @@ def assert_open(self):
if not self._isopen:
raise AssertionError('internal failure: file must be open '
'if `autoclose=True` is used.')


class PickleByReconstructionWrapper(object):

def __init__(self, opener, file, mode='r', **kwargs):
self.opener = partial(opener, file, mode=mode, **kwargs)
self.mode = mode
self._ds = None

@property
def value(self):
self._ds = self.opener()
return self._ds

def __getstate__(self):
state = self.__dict__.copy()
del state['_ds']
if self.mode == 'w':
# file has already been created, don't override when restoring
state['mode'] = 'a'
return state

def __setstate__(self, state):
self.__dict__.update(state)

def close(self):
self._ds.close()
118 changes: 118 additions & 0 deletions xarray/backends/file_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import contextlib
import threading

from .lru_cache import LRUCache


# Global cache for storing open files.
FILE_CACHE = LRUCache(512, on_evict=lambda k, v: v.close())

# TODO(shoyer): add an option (xarray.set_options) for resizing the cache.


class FileManager(object):
"""Wrapper for automatically opening and closing file objects.

Unlike files, FileManager objects can be safely pickled and passed between
processes. They should be explicitly closed to release resources, but
a per-process least-recently-used cache for open files ensures that you can
safely create arbitrarily large numbers of FileManager objects.

Example usage:

manager = FileManager(open, 'example.txt', mode='w')
with manager.acquire() as f:
f.write(...)
manager.close()
"""

def __init__(self, opener, *args, **kwargs):
Copy link
Member Author

@shoyer shoyer Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I want to use dependency injection for the cache (e.g., cache=FILE_CACHE), which unfortunately means that we'll need to change the signature here from using **kwargs.

Any opinions on what this should look like? I'm thinking maybe:

_DEFAULT = object()
def __init__(self, opener, *args, mode=_DEFAULT, kwargs=None, cache=FILE_CACHE)

"""Initialize a FileManager.

Parameters
----------
opener : callable
Function that when called like ``opener(*args, **kwargs)`` returns
an open file object. The file object must implement a ``close()``
method.
*args
Positional arguments for opener. A ``mode`` argument should be
provided as a keyword argument (see below).
**kwargs
Keyword arguments for opener. The keyword argument ``mode`` has
special handling if it is provided with a value of 'w': on all
calls after the first, it is changed to 'a' instead to avoid
overriding the newly created file. All argument values must be
hashable.
"""
self._opener = opener
self._args = args
self._kwargs = kwargs
self._key = _make_key(opener, args, kwargs)
self._lock = threading.RLock()

@contextlib.contextmanager
def acquire(self):
"""Context manager for acquiring a file object.

A new file is only opened if it has expired from the
least-recently-used cache.

This method uses a reentrant lock, which ensures that it is
thread-safe. You can safely acquire a file in multiple threads at the
same time, as long as the underlying file object is thread-safe.

Yields
------
Open file object, as returned by ``opener(*args, **kwargs)``.
"""
with self._lock:
try:
file = FILE_CACHE[self._key]
except KeyError:
file = self._opener(*self._args, **self._kwargs)
if self._kwargs.get('mode') == 'w':
# ensure file doesn't get overriden when opened again
self._kwargs['mode'] = 'a'
self._key = _make_key(
self._opener, self._args, self._kwargs)
FILE_CACHE[self._key] = file
yield file

def close(self):
"""Explicitly close any associated file object (if necessary)."""
file = FILE_CACHE.pop(self._key, default=None)
if file is not None:
file.close()

def __getstate__(self):
"""State for pickling."""
return (self._opener, self._args, self._kwargs)

def __setstate__(self, state):
"""Restore from a pickle."""
opener, args, kwargs = state
self.__init__(opener, *args, **kwargs)


class _HashedSequence(list):
"""Speedup repeated look-ups by caching hash values.

Based on what Python uses internally in functools.lru_cache.

Python doesn't perform this optimization automatically:
https://bugs.python.org/issue1462796
"""

def __init__(self, tuple_value):
self[:] = tuple_value
self.hashvalue = hash(tuple_value)

def __hash__(self):
return self.hashvalue


def _make_key(opener, args, kwargs):
"""Make a key for caching files in the LRU cache."""
value = (opener, args, tuple(sorted(kwargs.items())))
return _HashedSequence(value)
78 changes: 78 additions & 0 deletions xarray/backends/lru_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import collections
import threading


class LRUCache(collections.MutableMapping):
"""Thread-safe LRUCache based on an OrderedDict.

All dict operations (__getitem__, __setitem__, __contains__) update the
priority of the relevant key and take O(1) time. The dict is iterated over
in order from the oldest to newest key, which means that a complete pass
over the dict should not affect the order of any entries.

When a new item is set and the maximum size of the cache is exceeded, the
oldest item is dropped and called with ``on_evict(key, value)``.

The ``maxsize`` property can be used to view or resize the capacity of
the cache.
"""
def __init__(self, maxsize, on_evict=None):
"""
Parameters
----------
maxsize : int
Integer maximum number of items to hold in the cache.
on_evict: callable, optional
Function to call like ``on_evict(key, value)`` when items are
evicted.
"""
self._maxsize = maxsize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enforce maxsize is an integer? I'm thinking that it may be easy to see None/False as valid values. I think that case is going to break things downstream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._on_evict = on_evict
self._cache = collections.OrderedDict()
self._lock = threading.RLock()

def __getitem__(self, key):
# record recent use of the key by moving it to the front of the list
with self._lock:
value = self._cache[key]
self._cache.move_to_end(key)
return value

def _shrink(self, capacity):
"""Shrink the cache if necessary, evicting the oldest items."""
while len(self._cache) > capacity:
key, value = self._cache.popitem(last=False)
if self._on_evict is not None:
self._on_evict(key, value)

def __setitem__(self, key, value):
with self._lock:
if self._maxsize:
if key in self._cache:
self._cache.move_to_end(key)
elif len(self._cache) >= self._maxsize:
self._shrink(self._maxsize - 1)
self._cache[key] = value

def __delitem__(self, key):
del self._cache[key]

def __iter__(self):
# create a list, so accessing the cache during iteration cannot change
# the iteration order
return iter(list(self._cache))

def __len__(self):
return len(self._cache)

@property
def maxsize(self):
"""Maximum number of items can be held in the cache."""
return self._maxsize

@maxsize.setter
def maxsize(self, size):
"""Resize the cache, evicting the oldest items if necessary."""
with self._lock:
self._shrink(size)
self._maxsize = size
Loading