Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Chunk level access api / indexing by chunk rather than voxel #518

Closed
David-Baddeley opened this issue Nov 19, 2019 · 27 comments · Fixed by #1428
Closed

Enhancement: Chunk level access api / indexing by chunk rather than voxel #518

David-Baddeley opened this issue Nov 19, 2019 · 27 comments · Fixed by #1428

Comments

@David-Baddeley
Copy link
Contributor

I may be missing something, but zarr currently seems to offers a high level interface to the chunked data that appears to be largely transparent to the fact that the underlying data is in fact chunked (much like HDF5). I've got a few use cases where I'd like to be able to directly operate on the underlying chunks rather than the whole large dataset.

My target application is multi-dimensional image processing in large 3D biomedical datasets which there are many situations where it would make sense to perform operations on individual chunks in parallel.

Whilst it would be possible to a) read the chunk size of an array and b) work out how to slice the dataset in multiples of that chunk size, a direct chunk level access might be easier and more efficient. In essence I'm suggesting something which exposes a simplified version of Array._chunk_getitem and Array._chunk_setitem which would only ever get or set a complete chunk.

If you extended this concept somewhat by having the functions return a lightweightChunk object which was essentially a reference to the location of the array, a chunk ID, and a .data property which gave you the data for that chunk and additionally exposed an iterator in the Array class , you could conceivably write code like:

def do_something(chunk):
    res = some_processing_function(chunk.data)
    with zarr.open('output_file_uri') as z1:
        z1.save_chunk(res, chunk.chunk_id)

with multiprocessing.Pool() as pool:
    pool.map(do_something, array.chunk_iterator)

In the longer term (and I'm not sure how to go about this - I might be better aiming for API compatibility with zarr rather than inclusion in zarr) I'd want to enable data-local processing of individual chunks of an array which was chunked across a distributed file system. I've currently got a python solution for this for 2.5 dimensional data (xyt) but it's pretty specific to one use case and I would like to avoid duplicating other efforts as we make it more general.

@joshmoore
Copy link
Member

I could certainly imagine using such an API in order to get optimal loading.

@rabernat
Copy link
Contributor

in situations where it would make sense to perform operations on individual chunks in parallel.

In python, we currently use dask for these use cases. (Zarr was created with Dask interoperability in mind.) Dask can discover the chunk structure of zarr arrays (https://docs.dask.org/en/latest/array-api.html#dask.array.from_zarr) and map them to the chunks of a dask array.

@joshmoore
Copy link
Member

for reference: https://github.com/dask/dask/blob/fb4e91e015caa570ae6b578a12dc494f85b2e7f7/dask/array/core.py#L2774

So you're suggesting, @rabernat, anyone that wants this functionality would use dask?

Reversely, there's no general "protocol" that could expose such an interface both for dask's benefit as well as a more mundane end-user?

@alimanfoo
Copy link
Member

I think what @rabernat is probably getting at is that if you want to do a chunk-wise computation over a zarr array, and you want to parallelise some part of that computation, it's very easy to accomplish with dask. To give a totally trivial example, you have have some zarr array z, then you can do a parallel chunk-wise computation of the sum by doing:

import dask.array as da
da.from_array(z).sum().compute()

Dask will figure out where the chunk boundaries are and split up the computation accordingly. And you can switch that out from running using multiple cores on your local computer to running on multiple compute nodes in a HPC or kubernetes cluster with just a few lines of configuration code.

But not everyone will want to use dask of course, and so I can see the value in having a convenient public API on the zarr side to retrieve or replace a whole chunk.

@David-Baddeley
Copy link
Contributor Author

Looking at the code referenced by @joshmoore it appears as though dask simply slices the underlying zarr array. This is likely to entail some overhead (hard to know how much without going through the code in detail, but I suspect there is going to be at least an array allocation and a memcopy, if not substantially more), and will force the data to flow through the controlling node. Something which exposed chunks on a lower level could result in more efficiency (also for dask).

I should probably give a bit more background to my question. Over the last few years we have developed quite a bit of python infrastructure for distributed analysis of microscopy data (documentation is currently very poor, but there is a small amount of high level stuff in the supplement of https://www.biorxiv.org/content/10.1101/606954v2). What we have includes distributed, chunked, storage, fast compression, and a scheduler which facilitates data-local analysis of these chunks. When compared with, e.g. hadoop, dask, spark, etc we're probably faster for our specific task, but much more application specific (we can sustain a task rate of > 1000 tasks/s and a data-rate before compression of ~800MB/s). This has largely grown up in parallel to things like dask and zarr. At the moment we do really well on 2D +time (our chunks are generally a single camera frame), and also have some pretty simple (but reasonably fast - we're writing ~500 tiles/s) 2D tile pyramid code for slide-scanning. We're now looking to move into large 3D volumetric datasets, but don't really want to reinvent the wheel. It could be very attractive to use and potentially contribute to zarr with a new storage backend as we move to 3D, or at the very least maintain api-compatibility. Pretty critical to us in the long term, however, is being able to access and analyse the data where it sits across distributed storage, rather than having to pipe it through a central location. In the medium term this would likely also require adding something like a .locate() functionality to the zar data store which let you identify which node it was on, although this could easily live as a helper function in our scheduler - e.g. locate(data_store, key).

@alimanfoo
Copy link
Member

alimanfoo commented Nov 21, 2019 via email

@joshmoore
Copy link
Member

@David-Baddeley : I'll look into your pre-print, but as a follow-on to this discussion you might also be interested in tracking https://forum.image.sc/t/next-generation-file-formats-for-bioimaging/31361 All the best, ~Josh

@d-v-b
Copy link
Contributor

d-v-b commented Nov 26, 2019

@David-Baddeley I highly recommend taking a careful look at dask. For me, it's made processing enormous microscopy datasets much much easier. You can see some applied examples here and here

For your specific issue (accessing individual chunks of a zarr array): The map_blocks method of a dask array offers distributed access to chunks for the purposes of applying a function that returns a numpy array (e.g., filtering or transforming data). For more generic access to array chunks, you can use the blocks property of a dask array, e.g. this example. Even more generically, you can explicitly slice a dask array along chunk boundaries to create a collection of independent sub-arrays that can be processed in parallel with generic tools in the dask library, e.g. dask.delayed or dask.bag.

I hope this is helpful!

@jakirkham
Copy link
Member

I think @joshmoore’s point is people may want to solve this in other languages (like Java and C/C++ to name a couple). A proper API for this kind of functionality would be a good answer for them (as opposed to use this Python library 😉). That said, I completely agree that in Python Dask eliminates this need. 😀

Would expect even in our current (Python) implementation we have something like this already. So it is just a question of exposing that to the user. Seems reasonable to me at least.

Hopefully this helps bridge the gap between these two perspectives. 🙂

@d-v-b
Copy link
Contributor

d-v-b commented Nov 26, 2019

Would expect even in our current (Python) implementation we have something like this already. So it is just a question of exposing that to the user. Seems reasonable to me at least.

Sounds like a good idea. How would this work in practice? An iterator that yields (slice, thingy) where thingy is a lazy reference to the sub-array bounded by slice?

@jakirkham
Copy link
Member

jakirkham commented Nov 26, 2019

Ah I guess that really only applies to the first half of the ask namely get/set for chunks. Sorry for the lack of clarity. It should be easy (I think) for users to iterate over all chunks on their own (like using np.ndindex with .nchunks).

@David-Baddeley
Copy link
Contributor Author

Sorry for the silence, it's been a mad couple of weeks. The more I look at it, the more I think that dask interoperability is a worthy goal. That said, I don't think just using dask solves all my use cases though (one example limitation would seem to be data-locality which dask would seem to respect for temporary data during computation, but not for data in storage at the start or end of processing).

Even when using dask, adding a low-level chunk access api has the potential to speed things up. When reading a chunk from zarr, dask currently slices the zarr array with it's preferred chunk size. Within zarr this involves the allocation of a new array, the loading of the zar chunks corresponding to the area requested by dask (which might not correspond to the underlying zarr chunks), and coping the relevant bits across into the empty array.

My ideal interface would comprise the following:

  1. a lazy chunk reference object which is serialisable (ideally as .json or similar rather than pickle)
  2. a get_chunk() function returning the above
  3. (very, very) optionally an iterator over chunks. As @jakirkham says it would be pretty easy for users to implement this in minimal code, depending on whether the index passed to get_chunk() was a single index, or an n-tuple describing the chunk index along each direction.

@alimanfoo
Copy link
Member

Hi @David-Baddeley, thanks for following up.

It would be pretty straightforward to turn the Array._chunk_getitem() method into a public method, and also make some of the arguments optional. So, e.g., assuming we renamed this method to "get_chunk", you could then call myarray.get_chunk(coords) where coords is a tuple of integers and be returned a numpy array with the loaded chunk data.

I'm a bit hesitant to get into implementing something that returns lazy chunk reference objects which are also serialisable as json. Would you be able to explain why you need that?

@rabernat
Copy link
Contributor

rabernat commented Dec 9, 2019

@David-Baddeley - you use case sounds fascinating and remarkably similar to how we want to work with climate data. I really encourage you to play a bit more with dask before assuming that it won't work for you. The slicing you refer to, for example, is 100% lazy and doesn't require moving data to a "controlling node."

If you want to see what distributed, parallel processing with dask in the cloud looks like, give this binder a try:
https://binder.pangeo.io/v2/gh/pangeo-data/pangeo-ocean-examples/master?filepath=aviso_sea-surface-height.ipynb

Distributed computing is complicated. It's nice to be part of the dask community, rather than trying to roll these tools on our own. Our project has gotten much farther, and is more sustainable, because of this.

That said, I am 100% in favor of any optimizations we can find that will make zarr and dask work [even] better together.

@amatsukawa
Copy link

+1 for this feature. I am a dask user, and second the sentiment of using dask for complex array computations over chunks.

However, I have use cases where I would like to just load zarr chunks where performance is important. I would prefer to just remove the dask layer entirely, since I would just be doing a.blocks[i, j].compute() on random blocks. For now, I'm manually computing chunk boundaries to accomplish this. It's perfect fine, but it would be nice to have a .blocks API like dask does.

@alimanfoo
Copy link
Member

Thanks @amatsukawa for commenting. FWIW I'd still be happy for such a function which returns a specific chunk as a numpy array, given chunk indices. Happy for that to go ahead if it's useful, would be a relatively small change.

@Sh4zKh4n
Copy link

Hi, i think chunk level access for imaging would be interesting as well. As an example, if you are stitching two large volumes together with a certain point of overlap. It may be nice to do some registration and correlation across only a subset of chunks or even to use a window from one chunk as a kernel to slide over the other image. So I think there's some great opportunity here. Dask does do a great job if you want to work across the whole set or if you want to index. It may be nicer to have a chunk level without indexing and going over boundaries.

@skgbanga
Copy link

skgbanga commented Sep 2, 2020

Please note that a chunk level API will help in streaming applications as well as illustrated in the code in #595

@srib
Copy link

srib commented Jan 27, 2021

Just came looking for help docs and landed on this issue. This would be extremely beneficial.

Till an API of this nature is merged, is there a workaround?

@jakirkham
Copy link
Member

Yep I think we are just looking for a volunteer to do this work. It's not hard, but someone would need to pick it up 🙂

@lnicola
Copy link

lnicola commented Oct 6, 2021

Another use case for direct chunk access: at one point, we'd like to stack multiple arrays into a single one without loading all of it into memory. With a chunk size of 1 in that direction, this can be trivially done by copying or moving files on disk. But if we want to use a different chunk size, that's no longer possible.

An API to read/write a chunk given an index would greatly help here. I suppose one workaround would be to read slices (assuming it's possible without reading the whole array) but I don't know what to do on the write side.

@kephale
Copy link

kephale commented May 22, 2023

Hi Folks,

We're pretty interested in direct chunk access in napari now. @d-v-b had pointed out some performance issue with large images that contain a large number of chunks. That performance issue became very apparent in some recent work: napari/napari#5561 (comment). The rendering implementation in this demo requires efficient access when there are large numbers of chunks, but does not need the computation graph features from Dask.

I would be happy to pick up this work to provide access to chunks. It sounds like we need:

@rabernat
Copy link
Contributor

@kephale - it sounds like you might be interested in the feature proposed in #1398?

@kephale
Copy link

kephale commented May 24, 2023

@rabernat #1398 does look interesting, but it is a little complex. Would the batch abstraction for encoding be a requirement for adding get_chunk and set_chunk to the API?

Maybe it is short-sighted, but my main use case is just that I need constant time access to chunks in arrays with large numbers of chunks. Dask doesn't scale well for these arrays with many chunks, and the chunk sizes are determined by the visualization needs and cannot be changed. I'm just playing around with monkey patching the chunk access API for now, but since I saw a few requests for chunk-level API in this thread I thought I'd offer to do the work upstream.

@rabernat
Copy link
Contributor

... my main use case is just that I need constant time access to chunks in arrays with large numbers of chunks.

Ah ok that's a helpful clarification. But I guess I don't really understand where the current API falls short. Why is

array.get_chunk(chunk_coords)

better / faster than

array[chunk_index]

?

@kephale
Copy link

kephale commented May 24, 2023

I stand corrected. I haven't done precise measurements, but I get visually similar behavior by using array[chunk_index]. My mistake was that after running into performance issues with Dask I was suspicious of overhead from indexing/selection and went directly to _chunk_getitems.

Thank you for the help! This is being used to improve visualization of very large multiscale zarrs in napari, which currently looks like this napari/napari#5561 (comment).

@d-v-b
Copy link
Contributor

d-v-b commented May 24, 2023

Since I am on record recommending dask earlier in this issue (ca. 2019), I should add that my earlier answers concerned using dask for batched chunkwise array access, e.g. applying some transformation to a chunked array.

I do not recommend using dask for visualization of chunked arrays. In this application, you want cheap (i.e., O(constant)) random access to individual chunks, but dask can't do this (or, at least it couldn't the last time I checked).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.