-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xarray.backends refactor #2261
xarray.backends refactor #2261
Changes from 16 commits
4faaf3a
c82a38c
7a55a30
51463dd
23e132f
8fc8183
422944b
aea0a1a
4366c0b
f35b7e7
057cad2
0f3e656
1a0cc10
8784e6b
83d9b10
a0074ff
062ba96
2d41b29
2adf486
76f151c
769f079
3e97264
5e67efe
8dc77c4
1d38335
6350ca6
4aa0df7
67377c7
8c00f44
2a5d1f0
a6c170b
009e30d
14118ea
c778488
fe14ebf
f1026ce
e13406b
465dfae
55d35c8
c8fbadc
ede8ef0
220c302
36f1156
c6f43dd
8916bc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
import threading | ||
|
||
from ..core import utils | ||
from ..core.options import OPTIONS | ||
from .lru_cache import LRUCache | ||
|
||
|
||
# Global cache for storing open files. | ||
FILE_CACHE = LRUCache( | ||
OPTIONS['file_cache_maxsize'], on_evict=lambda k, v: v.close()) | ||
assert FILE_CACHE.maxsize, 'file cache must be at least size one' | ||
|
||
|
||
_DEFAULT_MODE = utils.ReprObject('<unused>') | ||
|
||
|
||
class FileManager(object): | ||
"""Wrapper for automatically opening and closing file objects. | ||
|
||
Unlike files, FileManager objects can be safely pickled and passed between | ||
processes. They should be explicitly closed to release resources, but | ||
a per-process least-recently-used cache for open files ensures that you can | ||
safely create arbitrarily large numbers of FileManager objects. | ||
|
||
Don't directly close files acquired from a FileManager. Instead, call | ||
FileManager.close(), which ensures that closed files are removed from the | ||
cache as well. | ||
|
||
Example usage: | ||
|
||
manager = FileManager(open, 'example.txt', mode='w') | ||
f = manager.acquire() | ||
f.write(...) | ||
manager.close() # ensures file is closed | ||
|
||
Note that as long as previous files are still cached, acquiring a file | ||
multiple times from the same FileManager is essentially free: | ||
|
||
f1 = manager.acquire() | ||
f2 = manager.acquire() | ||
assert f1 is f2 | ||
|
||
""" | ||
|
||
def __init__(self, opener, *args, **keywords): | ||
"""Initialize a FileManager. | ||
|
||
Parameters | ||
---------- | ||
opener : callable | ||
Function that when called like ``opener(*args, **kwargs)`` returns | ||
an open file object. The file object must implement a ``close()`` | ||
method. | ||
*args | ||
Positional arguments for opener. A ``mode`` argument should be | ||
provided as a keyword argument (see below). All arguments must be | ||
hashable. | ||
mode : optional | ||
If provided, passed as a keyword argument to ``opener`` along with | ||
``**kwargs``. ``mode='w' `` has special treatment: after the first | ||
call it is replaced by ``mode='a'`` in all subsequent function to | ||
avoid overriding the newly created file. | ||
kwargs : dict, optional | ||
Keyword arguments for opener, excluding ``mode``. All values must | ||
be hashable. | ||
lock : duck-compatible threading.Lock, optional | ||
Lock to use when modifying the cache inside acquire() and close(). | ||
By default, uses a new threading.Lock() object. If set, this object | ||
should be pickleable. | ||
cache : MutableMapping, optional | ||
Mapping to use as a cache for open files. By default, uses xarray's | ||
global LRU file cache. Because ``cache`` typically points to a | ||
global variable and contains non-picklable file objects, an | ||
unpickled FileManager objects will be restored with the default | ||
cache. | ||
""" | ||
# TODO: replace with real keyword arguments when we drop Python 2 | ||
# support | ||
mode = keywords.pop('mode', _DEFAULT_MODE) | ||
kwargs = keywords.pop('kwargs', None) | ||
lock = keywords.pop('lock', None) | ||
cache = keywords.pop('cache', FILE_CACHE) | ||
self._opener = opener | ||
self._args = args | ||
self._mode = mode | ||
self._kwargs = {} if kwargs is None else dict(kwargs) | ||
self._default_lock = lock is None | ||
self._lock = threading.Lock() if self._default_lock else lock | ||
self._cache = cache | ||
self._key = self._make_key() | ||
|
||
def _make_key(self): | ||
"""Make a key for caching files in the LRU cache.""" | ||
value = (self._opener, | ||
self._args, | ||
self._mode, | ||
tuple(sorted(self._kwargs.items()))) | ||
return _HashedSequence(value) | ||
|
||
def acquire(self): | ||
"""Acquiring a file object from the manager. | ||
|
||
A new file is only opened if it has expired from the | ||
least-recently-used cache. | ||
|
||
This method uses a reentrant lock, which ensures that it is | ||
thread-safe. You can safely acquire a file in multiple threads at the | ||
same time, as long as the underlying file object is thread-safe. | ||
|
||
Returns | ||
------- | ||
An open file object, as returned by ``opener(*args, **kwargs)``. | ||
""" | ||
with self._lock: | ||
try: | ||
file = self._cache[self._key] | ||
except KeyError: | ||
kwargs = self._kwargs | ||
if self._mode is not _DEFAULT_MODE: | ||
kwargs = kwargs.copy() | ||
kwargs['mode'] = self._mode | ||
file = self._opener(*self._args, **kwargs) | ||
if self._mode == 'w': | ||
# ensure file doesn't get overriden when opened again | ||
self._mode = 'a' | ||
self._key = self._make_key() | ||
self._cache[self._key] = file | ||
return file | ||
|
||
def close(self): | ||
"""Explicitly close any associated file object (if necessary).""" | ||
with self._lock: | ||
file = self._cache.pop(self._key, default=None) | ||
if file is not None: | ||
file.close() | ||
|
||
def __getstate__(self): | ||
"""State for pickling.""" | ||
lock = None if self._default_lock else self._lock | ||
return (self._opener, self._args, self._mode, self._kwargs, lock) | ||
|
||
def __setstate__(self, state): | ||
"""Restore from a pickle.""" | ||
opener, args, mode, kwargs, lock = state | ||
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock) | ||
|
||
|
||
class _HashedSequence(list): | ||
"""Speedup repeated look-ups by caching hash values. | ||
|
||
Based on what Python uses internally in functools.lru_cache. | ||
|
||
Python doesn't perform this optimization automatically: | ||
https://bugs.python.org/issue1462796 | ||
""" | ||
|
||
def __init__(self, tuple_value): | ||
self[:] = tuple_value | ||
self.hashvalue = hash(tuple_value) | ||
|
||
def __hash__(self): | ||
return self.hashvalue |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import collections | ||
import threading | ||
|
||
|
||
class LRUCache(collections.MutableMapping): | ||
"""Thread-safe LRUCache based on an OrderedDict. | ||
|
||
All dict operations (__getitem__, __setitem__, __contains__) update the | ||
priority of the relevant key and take O(1) time. The dict is iterated over | ||
in order from the oldest to newest key, which means that a complete pass | ||
over the dict should not affect the order of any entries. | ||
|
||
When a new item is set and the maximum size of the cache is exceeded, the | ||
oldest item is dropped and called with ``on_evict(key, value)``. | ||
|
||
The ``maxsize`` property can be used to view or adjust the capacity of | ||
the cache, e.g., ``cache.maxsize = new_size``. | ||
""" | ||
def __init__(self, maxsize, on_evict=None): | ||
""" | ||
Parameters | ||
---------- | ||
maxsize : int | ||
Integer maximum number of items to hold in the cache. | ||
on_evict: callable, optional | ||
Function to call like ``on_evict(key, value)`` when items are | ||
evicted. | ||
""" | ||
if maxsize < 0: | ||
raise ValueError('maxsize must be non-negative') | ||
self._maxsize = maxsize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we enforce maxsize is an integer? I'm thinking that it may be easy to see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
self._on_evict = on_evict | ||
self._cache = collections.OrderedDict() | ||
self._lock = threading.RLock() | ||
|
||
def __getitem__(self, key): | ||
# record recent use of the key by moving it to the front of the list | ||
with self._lock: | ||
value = self._cache[key] | ||
# On Python 3, could just use: self._cache.move_to_end(key) | ||
del self._cache[key] | ||
self._cache[key] = value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or maybe a little helper function in pycompat that we can cleanup when python 2 is dropped. def move_to_end(cache, key):
try:
cache.move_to_end(key)
except AttributeError:
del self._cache[key]
self._cache[key] = value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, easy enough. Done. |
||
return value | ||
|
||
def _enforce_size_limit(self, capacity): | ||
"""Shrink the cache if necessary, evicting the oldest items.""" | ||
while len(self._cache) > capacity: | ||
key, value = self._cache.popitem(last=False) | ||
if self._on_evict is not None: | ||
self._on_evict(key, value) | ||
|
||
def __setitem__(self, key, value): | ||
with self._lock: | ||
if key in self._cache: | ||
# insert the new value at the end | ||
del self._cache[key] | ||
self._cache[key] = value | ||
elif self._maxsize: | ||
# make room if necessary | ||
self._enforce_size_limit(self._maxsize - 1) | ||
self._cache[key] = value | ||
elif self._on_evict is not None: | ||
# not saving, immediately evict | ||
self._on_evict(key, value) | ||
|
||
def __delitem__(self, key): | ||
del self._cache[key] | ||
|
||
def __iter__(self): | ||
# create a list, so accessing the cache during iteration cannot change | ||
# the iteration order | ||
return iter(list(self._cache)) | ||
|
||
def __len__(self): | ||
return len(self._cache) | ||
|
||
@property | ||
def maxsize(self): | ||
"""Maximum number of items can be held in the cache.""" | ||
return self._maxsize | ||
|
||
@maxsize.setter | ||
def maxsize(self, size): | ||
"""Resize the cache, evicting the oldest items if necessary.""" | ||
if size < 0: | ||
raise ValueError('maxsize must be non-negative') | ||
with self._lock: | ||
self._enforce_size_limit(size) | ||
self._maxsize = size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer not to override the builtin
file
function here. Perhaps we can usefh
or something.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file
is only a builtin on Python 2... are we still concerned about overriding it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know this and I'm happy to hear it. (I can't wait to be done with Python 2)