-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix distributed writes #1793
Merged
Merged
fix distributed writes #1793
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
63abe7f
distributed tests that write dask arrays
1952173
Change zarr test to synchronous API
mrocklin dd4bfcf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
5c7f94c
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
9e70a3a
initial go at __setitem__ on array wrappers
ec67a54
fixes for scipy
2a4faa4
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
5497ad1
cleanup after merging with upstream/master
c2f5bb8
needless duplication of tests to work around pytest bug
5344fe8
use netcdf_variable instead of get_array()
7cbd2e5
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
49366bf
use synchronous dask.distributed test harness
mrocklin 323f17b
Merge branch 'master' into feature/distributed_writes
81f7610
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
199538e
cleanup tests
d2050e7
per scheduler locks and autoclose behavior for writes
76675de
HDF5_LOCK and CombinedLock
9ac0327
integration test for distributed locks
05e7d54
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
1672968
more tests and set isopen to false when pickling
a667615
Fixing style errors.
stickler-ci 2c0a7e8
ds property on DataStorePickleMixin
6bcadfe
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
aba6bdc
stickler-ci
5702c67
compat fixes for other backends
a06b683
HDF5_USE_FILE_LOCKING = False in test_distributed
efe8308
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
6ef31aa
style fix
00156c3
update tests to only expect netcdf4 to work, docstrings, and some cle…
3dcfac5
Fixing style errors.
stickler-ci 29edaa7
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
61ee5a8
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
91f3c6a
fix imports
5cb91ba
fix more import bugs
2b97d4f
update docs
2dc514f
fix for pynio
eff0161
cleanup locks and use pytest monkeypatch for environment variable
5290484
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
c855284
fix failing test using combined lock
3c2ffbf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
|
||
import contextlib | ||
import logging | ||
import multiprocessing | ||
import threading | ||
import time | ||
import traceback | ||
import warnings | ||
|
@@ -14,11 +16,12 @@ | |
from ..core.pycompat import dask_array_type, iteritems | ||
from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin | ||
|
||
# Import default lock | ||
try: | ||
from dask.utils import SerializableLock as Lock | ||
from dask.utils import SerializableLock | ||
HDF5_LOCK = SerializableLock() | ||
except ImportError: | ||
from threading import Lock | ||
|
||
HDF5_LOCK = threading.Lock() | ||
|
||
# Create a logger object, but don't add any handlers. Leave that to user code. | ||
logger = logging.getLogger(__name__) | ||
|
@@ -27,8 +30,54 @@ | |
NONE_VAR_NAME = '__values__' | ||
|
||
|
||
# dask.utils.SerializableLock if available, otherwise just a threading.Lock | ||
GLOBAL_LOCK = Lock() | ||
def get_scheduler(get=None, collection=None): | ||
""" Determine the dask scheduler that is being used. | ||
|
||
None is returned if not dask scheduler is active. | ||
|
||
See also | ||
-------- | ||
dask.utils.effective_get | ||
""" | ||
try: | ||
from dask.utils import effective_get | ||
actual_get = effective_get(get, collection) | ||
try: | ||
from dask.distributed import Client | ||
if isinstance(actual_get.__self__, Client): | ||
return 'distributed' | ||
except (ImportError, AttributeError): | ||
try: | ||
import dask.multiprocessing | ||
if actual_get == dask.multiprocessing.get: | ||
return 'multiprocessing' | ||
else: | ||
return 'threaded' | ||
except ImportError: | ||
return 'threaded' | ||
except ImportError: | ||
return None | ||
|
||
|
||
def get_scheduler_lock(scheduler, path_or_file=None): | ||
""" Get the appropriate lock for a certain situation based onthe dask | ||
scheduler used. | ||
|
||
See Also | ||
-------- | ||
dask.utils.get_scheduler_lock | ||
""" | ||
|
||
if scheduler == 'distributed': | ||
from dask.distributed import Lock | ||
return Lock(path_or_file) | ||
elif scheduler == 'multiprocessing': | ||
return multiprocessing.Lock() | ||
elif scheduler == 'threaded': | ||
from dask.utils import SerializableLock | ||
return SerializableLock() | ||
else: | ||
return threading.Lock() | ||
|
||
|
||
def _encode_variable_name(name): | ||
|
@@ -77,6 +126,39 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, | |
time.sleep(1e-3 * next_delay) | ||
|
||
|
||
class CombinedLock(object): | ||
"""A combination of multiple locks. | ||
|
||
Like a locked door, a CombinedLock is locked if any of its constituent | ||
locks are locked. | ||
""" | ||
|
||
def __init__(self, locks): | ||
self.locks = tuple(set(locks)) # remove duplicates | ||
|
||
def acquire(self, *args): | ||
return all(lock.acquire(*args) for lock in self.locks) | ||
|
||
def release(self, *args): | ||
for lock in self.locks: | ||
lock.release(*args) | ||
|
||
def __enter__(self): | ||
for lock in self.locks: | ||
lock.__enter__() | ||
|
||
def __exit__(self, *args): | ||
for lock in self.locks: | ||
lock.__exit__(*args) | ||
|
||
@property | ||
def locked(self): | ||
return any(lock.locked for lock in self.locks) | ||
|
||
def __repr__(self): | ||
return "CombinedLock(%r)" % list(self.locks) | ||
|
||
|
||
class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): | ||
|
||
def __array__(self, dtype=None): | ||
|
@@ -85,7 +167,9 @@ def __array__(self, dtype=None): | |
|
||
|
||
class AbstractDataStore(Mapping): | ||
_autoclose = False | ||
_autoclose = None | ||
_ds = None | ||
_isopen = False | ||
|
||
def __iter__(self): | ||
return iter(self.variables) | ||
|
@@ -168,7 +252,7 @@ def __exit__(self, exception_type, exception_value, traceback): | |
|
||
|
||
class ArrayWriter(object): | ||
def __init__(self, lock=GLOBAL_LOCK): | ||
def __init__(self, lock=HDF5_LOCK): | ||
self.sources = [] | ||
self.targets = [] | ||
self.lock = lock | ||
|
@@ -178,11 +262,7 @@ def add(self, source, target): | |
self.sources.append(source) | ||
self.targets.append(target) | ||
else: | ||
try: | ||
target[...] = source | ||
except TypeError: | ||
# workaround for GH: scipy/scipy#6880 | ||
target[:] = source | ||
target[...] = source | ||
|
||
def sync(self): | ||
if self.sources: | ||
|
@@ -193,9 +273,9 @@ def sync(self): | |
|
||
|
||
class AbstractWritableDataStore(AbstractDataStore): | ||
def __init__(self, writer=None): | ||
def __init__(self, writer=None, lock=HDF5_LOCK): | ||
if writer is None: | ||
writer = ArrayWriter() | ||
writer = ArrayWriter(lock=lock) | ||
self.writer = writer | ||
|
||
def encode(self, variables, attributes): | ||
|
@@ -239,6 +319,9 @@ def set_variable(self, k, v): # pragma: no cover | |
raise NotImplementedError | ||
|
||
def sync(self): | ||
if self._isopen and self._autoclose: | ||
# datastore will be reopened during write | ||
self.close() | ||
self.writer.sync() | ||
|
||
def store_dataset(self, dataset): | ||
|
@@ -373,27 +456,41 @@ class DataStorePickleMixin(object): | |
|
||
def __getstate__(self): | ||
state = self.__dict__.copy() | ||
del state['ds'] | ||
del state['_ds'] | ||
del state['_isopen'] | ||
if self._mode == 'w': | ||
# file has already been created, don't override when restoring | ||
state['_mode'] = 'a' | ||
return state | ||
|
||
def __setstate__(self, state): | ||
self.__dict__.update(state) | ||
self.ds = self._opener(mode=self._mode) | ||
self._ds = None | ||
self._isopen = False | ||
|
||
@property | ||
def ds(self): | ||
if self._ds is not None and self._isopen: | ||
return self._ds | ||
ds = self._opener(mode=self._mode) | ||
self._isopen = True | ||
return ds | ||
|
||
@contextlib.contextmanager | ||
def ensure_open(self, autoclose): | ||
def ensure_open(self, autoclose=None): | ||
""" | ||
Helper function to make sure datasets are closed and opened | ||
at appropriate times to avoid too many open file errors. | ||
|
||
Use requires `autoclose=True` argument to `open_mfdataset`. | ||
""" | ||
if self._autoclose and not self._isopen: | ||
|
||
if autoclose is None: | ||
autoclose = self._autoclose | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could probably use some additional thinking. |
||
|
||
if not self._isopen: | ||
try: | ||
self.ds = self._opener() | ||
self._ds = self._opener() | ||
self._isopen = True | ||
yield | ||
finally: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous test failures were having trouble in
__enter__
when iterating over aset
of locks. casting to list/tuple seems to have resolved that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh. I would if non-deterministic ordering of set iteration (e.g., after serialization/unserialization) contributed to that.