Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Concurrent block reads and writes #534

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ zarr/version.py
#doesnotexist
#test_sync*
data/*

venv/*
15 changes: 15 additions & 0 deletions docs/api/concurrency.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Concurrency
===========

Zarr supports concurrent reads and writes to distinct blocks through the use of an `Executor <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor>`__ object.
The read and write routines of a :class:`zarr.core.Array` accept an optional ``executor`` keyword argument which controls how or if concurrent execution should be performed.
By default, or if ``executor=None``, all blocks will be read and written serially.

.. warning::

Not all executors can be used with all stores safely.
For example, a ``ThreadPoolExecutor`` may only be used if the underlying store is in fact thread safe.

For stores where the data is already in memory or can be read very quickly, serial execution will likely be the fastest type of execution.
A concurrent executor is particularly useful when there is a high IO cost to retrieving a block, for example, with a store that reads data from some cloud object storage like Amazon S3.
In the case of some cloud object storage, a concurrent executor allows the Zarr to submit all of the web requests at once, instead of executing many web requests serially.
131 changes: 101 additions & 30 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def __getitem__(self, selection):
fields, selection = pop_fields(selection)
return self.get_basic_selection(selection, fields=fields)

def get_basic_selection(self, selection=Ellipsis, out=None, fields=None):
def get_basic_selection(self, selection=Ellipsis, out=None, fields=None, executor=None):
"""Retrieve data for an item or region of the array.

Parameters
Expand All @@ -584,6 +584,9 @@ def get_basic_selection(self, selection=Ellipsis, out=None, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to
extract data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Returns
-------
Expand Down Expand Up @@ -695,7 +698,7 @@ def get_basic_selection(self, selection=Ellipsis, out=None, fields=None):
fields=fields)
else:
return self._get_basic_selection_nd(selection=selection, out=out,
fields=fields)
fields=fields, executor=executor)

def _get_basic_selection_zd(self, selection, out=None, fields=None):
# special case basic selection for zero-dimensional array
Expand Down Expand Up @@ -731,15 +734,20 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None):

return out

def _get_basic_selection_nd(self, selection, out=None, fields=None):
def _get_basic_selection_nd(self, selection, out=None, fields=None, executor=None):
# implementation of basic selection for array with at least one dimension

# setup indexer
indexer = BasicIndexer(selection, self)

return self._get_selection(indexer=indexer, out=out, fields=fields)
return self._get_selection(
indexer=indexer,
out=out,
fields=fields,
executor=executor,
)

def get_orthogonal_selection(self, selection, out=None, fields=None):
def get_orthogonal_selection(self, selection, out=None, fields=None, executor=None):
"""Retrieve data by making a selection for each dimension of the array. For
example, if an array has 2 dimensions, allows selecting specific rows and/or
columns. The selection for each dimension can be either an integer (indexing a
Expand All @@ -756,6 +764,9 @@ def get_orthogonal_selection(self, selection, out=None, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to
extract data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Returns
-------
Expand Down Expand Up @@ -848,9 +859,14 @@ def get_orthogonal_selection(self, selection, out=None, fields=None):
# setup indexer
indexer = OrthogonalIndexer(selection, self)

return self._get_selection(indexer=indexer, out=out, fields=fields)
return self._get_selection(
indexer=indexer,
out=out,
fields=fields,
executor=executor,
)

def get_coordinate_selection(self, selection, out=None, fields=None):
def get_coordinate_selection(self, selection, out=None, fields=None, executor=None):
"""Retrieve a selection of individual items, by providing the indices
(coordinates) for each selected item.

Expand All @@ -863,6 +879,9 @@ def get_coordinate_selection(self, selection, out=None, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to
extract data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Returns
-------
Expand Down Expand Up @@ -923,14 +942,19 @@ def get_coordinate_selection(self, selection, out=None, fields=None):
if out is not None:
out = out.reshape(-1)

out = self._get_selection(indexer=indexer, out=out, fields=fields)
out = self._get_selection(
indexer=indexer,
out=out,
fields=fields,
executor=executor,
)

# restore shape
out = out.reshape(indexer.sel_shape)

return out

def get_mask_selection(self, selection, out=None, fields=None):
def get_mask_selection(self, selection, out=None, fields=None, executor=None):
"""Retrieve a selection of individual items, by providing a Boolean array of the
same shape as the array against which the selection is being made, where True
values indicate a selected item.
Expand All @@ -945,6 +969,9 @@ def get_mask_selection(self, selection, out=None, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to
extract data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Returns
-------
Expand Down Expand Up @@ -997,9 +1024,14 @@ def get_mask_selection(self, selection, out=None, fields=None):
# setup indexer
indexer = MaskIndexer(selection, self)

return self._get_selection(indexer=indexer, out=out, fields=fields)
return self._get_selection(
indexer=indexer,
out=out,
fields=fields,
executor=executor,
)

def _get_selection(self, indexer, out=None, fields=None):
def _get_selection(self, indexer, out=None, fields=None, executor=None):

# We iterate over all chunks which overlap the selection and thus contain data
# that needs to be extracted. Each chunk is processed in turn, extracting the
Expand All @@ -1020,12 +1052,25 @@ def _get_selection(self, indexer, out=None, fields=None):
else:
check_array_shape('out', out, out_shape)

# iterate over chunks
for chunk_coords, chunk_selection, out_selection in indexer:
def f(item):
chunk_coords, chunk_selection, out_selection = item
return self._chunk_getitem(
chunk_coords,
chunk_selection,
out,
out_selection,
drop_axes=indexer.drop_axes,
fields=fields,
)

if executor is None:
map_ = map
else:
map_ = executor.map

# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)
# iterate over chunks
for _ in map_(f, indexer):
pass

if out.shape:
return out
Expand Down Expand Up @@ -1114,7 +1159,7 @@ def __setitem__(self, selection, value):
fields, selection = pop_fields(selection)
self.set_basic_selection(selection, value, fields=fields)

def set_basic_selection(self, selection, value, fields=None):
def set_basic_selection(self, selection, value, fields=None, executor=None):
"""Modify data for an item or region of the array.

Parameters
Expand All @@ -1127,6 +1172,9 @@ def set_basic_selection(self, selection, value, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to set
data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Examples
--------
Expand Down Expand Up @@ -1207,9 +1255,14 @@ def set_basic_selection(self, selection, value, fields=None):
if self._shape == ():
return self._set_basic_selection_zd(selection, value, fields=fields)
else:
return self._set_basic_selection_nd(selection, value, fields=fields)

def set_orthogonal_selection(self, selection, value, fields=None):
return self._set_basic_selection_nd(
selection,
value,
fields=fields,
executor=executor,
)

def set_orthogonal_selection(self, selection, value, fields=None, executor=None):
"""Modify data via a selection for each dimension of the array.

Parameters
Expand All @@ -1222,6 +1275,9 @@ def set_orthogonal_selection(self, selection, value, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to set
data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Examples
--------
Expand Down Expand Up @@ -1297,9 +1353,9 @@ def set_orthogonal_selection(self, selection, value, fields=None):
# setup indexer
indexer = OrthogonalIndexer(selection, self)

self._set_selection(indexer, value, fields=fields)
self._set_selection(indexer, value, fields=fields, executor=executor)

def set_coordinate_selection(self, selection, value, fields=None):
def set_coordinate_selection(self, selection, value, fields=None, executor=None):
"""Modify a selection of individual items, by providing the indices (coordinates)
for each item to be modified.

Expand All @@ -1312,6 +1368,9 @@ def set_coordinate_selection(self, selection, value, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to set
data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Examples
--------
Expand Down Expand Up @@ -1375,9 +1434,9 @@ def set_coordinate_selection(self, selection, value, fields=None):
if hasattr(value, 'shape') and len(value.shape) > 1:
value = value.reshape(-1)

self._set_selection(indexer, value, fields=fields)
self._set_selection(indexer, value, fields=fields, executor=executor)

def set_mask_selection(self, selection, value, fields=None):
def set_mask_selection(self, selection, value, fields=None, executor=None):
"""Modify a selection of individual items, by providing a Boolean array of the
same shape as the array against which the selection is being made, where True
values indicate a selected item.
Expand All @@ -1392,6 +1451,9 @@ def set_mask_selection(self, selection, value, fields=None):
fields : str or sequence of str, optional
For arrays with a structured dtype, one or more fields can be specified to set
data for.
executor : concurrent.futures.Executor or None, optional
An executor for submitting tasks to run concurrently. If not
provided, work will be executed serially.

Examples
--------
Expand Down Expand Up @@ -1450,7 +1512,7 @@ def set_mask_selection(self, selection, value, fields=None):
# setup indexer
indexer = MaskIndexer(selection, self)

self._set_selection(indexer, value, fields=fields)
self._set_selection(indexer, value, fields=fields, executor=executor)

def _set_basic_selection_zd(self, selection, value, fields=None):
# special case __setitem__ for zero-dimensional array
Expand Down Expand Up @@ -1492,15 +1554,15 @@ def _set_basic_selection_zd(self, selection, value, fields=None):
cdata = self._encode_chunk(chunk)
self.chunk_store[ckey] = cdata

def _set_basic_selection_nd(self, selection, value, fields=None):
def _set_basic_selection_nd(self, selection, value, fields=None, executor=None):
# implementation of __setitem__ for array with at least one dimension

# setup indexer
indexer = BasicIndexer(selection, self)

self._set_selection(indexer, value, fields=fields)
self._set_selection(indexer, value, fields=fields, executor=executor)

def _set_selection(self, indexer, value, fields=None):
def _set_selection(self, indexer, value, fields=None, executor=None):

# We iterate over all chunks which overlap the selection and thus contain data
# that needs to be replaced. Each chunk is processed in turn, extracting the
Expand Down Expand Up @@ -1528,8 +1590,8 @@ def _set_selection(self, indexer, value, fields=None):
value = np.asanyarray(value)
check_array_shape('value', value, sel_shape)

# iterate over chunks in range
for chunk_coords, chunk_selection, out_selection in indexer:
def f(item):
chunk_coords, chunk_selection, out_selection = item

# extract data to store
if sel_shape == ():
Expand All @@ -1549,6 +1611,15 @@ def _set_selection(self, indexer, value, fields=None):
# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)

if executor is None:
map_ = map
else:
map_ = executor.map

# iterate over chunks in range
for _ in map_(f, indexer):
pass

def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
drop_axes=None, fields=None):
"""Obtain part or whole of a chunk.
Expand Down