-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix distributed writes #1793
fix distributed writes #1793
Changes from 36 commits
63abe7f
1952173
dd4bfcf
5c7f94c
9e70a3a
ec67a54
2a4faa4
5497ad1
c2f5bb8
5344fe8
7cbd2e5
49366bf
323f17b
81f7610
199538e
d2050e7
76675de
9ac0327
05e7d54
1672968
a667615
2c0a7e8
6bcadfe
aba6bdc
5702c67
a06b683
efe8308
6ef31aa
00156c3
3dcfac5
29edaa7
61ee5a8
91f3c6a
5cb91ba
2b97d4f
2dc514f
eff0161
5290484
c855284
3c2ffbf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,8 @@ | |
from ..core.combine import auto_combine | ||
from ..core.pycompat import basestring, path_type | ||
from ..core.utils import close_on_error, is_remote_uri | ||
from .common import GLOBAL_LOCK, ArrayWriter | ||
from .common import ( | ||
HDF5_LOCK, ArrayWriter, CombinedLock, get_scheduler, get_scheduler_lock) | ||
|
||
DATAARRAY_NAME = '__xarray_dataarray_name__' | ||
DATAARRAY_VARIABLE = '__xarray_dataarray_variable__' | ||
|
@@ -64,9 +65,9 @@ def _default_lock(filename, engine): | |
else: | ||
# TODO: identify netcdf3 files and don't use the global lock | ||
# for them | ||
lock = GLOBAL_LOCK | ||
lock = HDF5_LOCK | ||
elif engine in {'h5netcdf', 'pynio'}: | ||
lock = GLOBAL_LOCK | ||
lock = HDF5_LOCK | ||
else: | ||
lock = False | ||
return lock | ||
|
@@ -129,6 +130,32 @@ def _protect_dataset_variables_inplace(dataset, cache): | |
variable.data = data | ||
|
||
|
||
def _get_lock(engine, scheduler, format, path_or_file): | ||
""" Get the lock(s) that apply to a particular scheduler/engine/format""" | ||
|
||
locks = [] | ||
SchedulerLock = get_scheduler_lock(scheduler) | ||
if format in ['NETCDF4', None] and engine in ['h5netcdf', 'netcdf4']: | ||
locks.append(HDF5_LOCK) | ||
|
||
try: | ||
# per file lock | ||
# Dask locks take a name argument (e.g. filename) | ||
locks.append(SchedulerLock(path_or_file)) | ||
except TypeError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be less error prone to pass the name to |
||
# threading/multiprocessing lock | ||
locks.append(SchedulerLock()) | ||
|
||
# When we have more than one lock, use the CombinedLock wrapper class | ||
lock = CombinedLock(locks) if len(locks) > 1 else locks[0] | ||
|
||
# Question: Should we be dropping one of these two locks when they are they | ||
# are basically the same. For instance, when using netcdf4 and dask is not | ||
# installed, locks will be [threading.Lock(), threading.Lock()] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is harmless, as long as these are different lock instances. On the other hand, something like |
||
|
||
return lock | ||
|
||
|
||
def open_dataset(filename_or_obj, group=None, decode_cf=True, | ||
mask_and_scale=True, decode_times=True, autoclose=False, | ||
concat_characters=True, decode_coords=True, engine=None, | ||
|
@@ -620,8 +647,20 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None, | |
# if a writer is provided, store asynchronously | ||
sync = writer is None | ||
|
||
# handle scheduler specific logic | ||
scheduler = get_scheduler() | ||
if (dataset.chunks and scheduler in ['distributed', 'multiprocessing'] and | ||
engine != 'netcdf4'): | ||
raise NotImplementedError("Writing netCDF files with the %s backend " | ||
"is not currently supported with dask's %s " | ||
"scheduler" % (engine, scheduler)) | ||
lock = _get_lock(engine, scheduler, format, path_or_file) | ||
autoclose = (dataset.chunks and | ||
scheduler in ['distributed', 'multiprocessing']) | ||
|
||
target = path_or_file if path_or_file is not None else BytesIO() | ||
store = store_open(target, mode, format, group, writer) | ||
store = store_open(target, mode, format, group, writer, | ||
autoclose=autoclose, lock=lock) | ||
|
||
if unlimited_dims is None: | ||
unlimited_dims = dataset.encoding.get('unlimited_dims', None) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
|
||
import contextlib | ||
import logging | ||
import multiprocessing | ||
import threading | ||
import time | ||
import traceback | ||
import warnings | ||
|
@@ -14,11 +16,12 @@ | |
from ..core.pycompat import dask_array_type, iteritems | ||
from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin | ||
|
||
# Import default lock | ||
try: | ||
from dask.utils import SerializableLock as Lock | ||
from dask.utils import SerializableLock | ||
HDF5_LOCK = SerializableLock() | ||
except ImportError: | ||
from threading import Lock | ||
|
||
HDF5_LOCK = threading.Lock() | ||
|
||
# Create a logger object, but don't add any handlers. Leave that to user code. | ||
logger = logging.getLogger(__name__) | ||
|
@@ -27,8 +30,54 @@ | |
NONE_VAR_NAME = '__values__' | ||
|
||
|
||
# dask.utils.SerializableLock if available, otherwise just a threading.Lock | ||
GLOBAL_LOCK = Lock() | ||
def get_scheduler(get=None, collection=None): | ||
""" Determine the dask scheduler that is being used. | ||
|
||
None is returned if not dask scheduler is active. | ||
|
||
See also | ||
-------- | ||
dask.utils.effective_get | ||
""" | ||
try: | ||
from dask.utils import effective_get | ||
actual_get = effective_get(get, collection) | ||
try: | ||
from dask.distributed import Client | ||
if isinstance(actual_get.__self__, Client): | ||
return 'distributed' | ||
except (ImportError, AttributeError): | ||
try: | ||
import dask.multiprocessing | ||
if actual_get == dask.multiprocessing.get: | ||
return 'multiprocessing' | ||
else: | ||
return 'threaded' | ||
except ImportError: | ||
return 'threaded' | ||
except ImportError: | ||
return None | ||
|
||
|
||
def get_scheduler_lock(scheduler): | ||
""" Get the appropriate lock for a certain situation based onthe dask | ||
scheduler used. | ||
|
||
See Also | ||
-------- | ||
dask.utils.get_scheduler_lock | ||
""" | ||
|
||
if scheduler == 'distributed': | ||
from dask.distributed import Lock | ||
return Lock | ||
elif scheduler == 'multiprocessing': | ||
return multiprocessing.Lock | ||
elif scheduler == 'threaded': | ||
from dask.utils import SerializableLock | ||
return SerializableLock | ||
else: | ||
return threading.Lock | ||
|
||
|
||
def _encode_variable_name(name): | ||
|
@@ -77,6 +126,39 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, | |
time.sleep(1e-3 * next_delay) | ||
|
||
|
||
class CombinedLock(object): | ||
"""A combination of multiple locks. | ||
|
||
Like a locked door, a CombinedLock is locked if any of its constituent | ||
locks are locked. | ||
""" | ||
|
||
def __init__(self, locks): | ||
self.locks = locks | ||
|
||
def acquire(self, *args): | ||
return all(lock.acquire(*args) for lock in self.locks) | ||
|
||
def release(self, *args): | ||
for lock in self.locks: | ||
lock.release(*args) | ||
|
||
def __enter__(self): | ||
for lock in self.locks: | ||
lock.__enter__() | ||
|
||
def __exit__(self, *args): | ||
for lock in self.locks: | ||
lock.__exit__(*args) | ||
|
||
@property | ||
def locked(self): | ||
return any(lock.locked for lock in self.locks) | ||
|
||
def __repr__(self): | ||
return "CombinedLock(%s)" % [repr(lock) for lock in self.locks] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I think you could equivalently substitute |
||
|
||
|
||
class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): | ||
|
||
def __array__(self, dtype=None): | ||
|
@@ -85,7 +167,9 @@ def __array__(self, dtype=None): | |
|
||
|
||
class AbstractDataStore(Mapping): | ||
_autoclose = False | ||
_autoclose = None | ||
_ds = None | ||
_isopen = False | ||
|
||
def __iter__(self): | ||
return iter(self.variables) | ||
|
@@ -168,7 +252,7 @@ def __exit__(self, exception_type, exception_value, traceback): | |
|
||
|
||
class ArrayWriter(object): | ||
def __init__(self, lock=GLOBAL_LOCK): | ||
def __init__(self, lock=HDF5_LOCK): | ||
self.sources = [] | ||
self.targets = [] | ||
self.lock = lock | ||
|
@@ -178,11 +262,7 @@ def add(self, source, target): | |
self.sources.append(source) | ||
self.targets.append(target) | ||
else: | ||
try: | ||
target[...] = source | ||
except TypeError: | ||
# workaround for GH: scipy/scipy#6880 | ||
target[:] = source | ||
target[...] = source | ||
|
||
def sync(self): | ||
if self.sources: | ||
|
@@ -193,9 +273,9 @@ def sync(self): | |
|
||
|
||
class AbstractWritableDataStore(AbstractDataStore): | ||
def __init__(self, writer=None): | ||
def __init__(self, writer=None, lock=HDF5_LOCK): | ||
if writer is None: | ||
writer = ArrayWriter() | ||
writer = ArrayWriter(lock=lock) | ||
self.writer = writer | ||
|
||
def encode(self, variables, attributes): | ||
|
@@ -239,6 +319,9 @@ def set_variable(self, k, v): # pragma: no cover | |
raise NotImplementedError | ||
|
||
def sync(self): | ||
if self._isopen and self._autoclose: | ||
# datastore will be reopened during write | ||
self.close() | ||
self.writer.sync() | ||
|
||
def store_dataset(self, dataset): | ||
|
@@ -373,27 +456,41 @@ class DataStorePickleMixin(object): | |
|
||
def __getstate__(self): | ||
state = self.__dict__.copy() | ||
del state['ds'] | ||
del state['_ds'] | ||
del state['_isopen'] | ||
if self._mode == 'w': | ||
# file has already been created, don't override when restoring | ||
state['_mode'] = 'a' | ||
return state | ||
|
||
def __setstate__(self, state): | ||
self.__dict__.update(state) | ||
self.ds = self._opener(mode=self._mode) | ||
self._ds = None | ||
self._isopen = False | ||
|
||
@property | ||
def ds(self): | ||
if self._ds is not None and self._isopen: | ||
return self._ds | ||
ds = self._opener(mode=self._mode) | ||
self._isopen = True | ||
return ds | ||
|
||
@contextlib.contextmanager | ||
def ensure_open(self, autoclose): | ||
def ensure_open(self, autoclose=None): | ||
""" | ||
Helper function to make sure datasets are closed and opened | ||
at appropriate times to avoid too many open file errors. | ||
|
||
Use requires `autoclose=True` argument to `open_mfdataset`. | ||
""" | ||
if self._autoclose and not self._isopen: | ||
|
||
if autoclose is None: | ||
autoclose = self._autoclose | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could probably use some additional thinking. |
||
|
||
if not self._isopen: | ||
try: | ||
self.ds = self._opener() | ||
self._ds = self._opener() | ||
self._isopen = True | ||
yield | ||
finally: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be "to xarray datastores"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I think it makes sense as is.
Maybe "Support for writing xarray datasets to netCDF files..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nice to see you were able to get this to work with SciPy!