From d60c3aba390e84dfe99431fa3505efeb776b0bb4 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Wed, 28 Aug 2024 16:40:04 -0700 Subject: [PATCH 01/11] Chunked writing of h5py.Dataset and zarr.Array --- src/anndata/_io/specs/methods.py | 45 +++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 794d60437..477d79be8 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -373,13 +373,10 @@ def write_list( # It's in the `AnnData.concatenate` docstring, but should we keep it? @_REGISTRY.register_write(H5Group, views.ArrayView, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(H5Group, np.ndarray, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(H5Group, h5py.Dataset, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(H5Group, np.ma.MaskedArray, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(ZarrGroup, views.ArrayView, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(ZarrGroup, np.ndarray, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(ZarrGroup, h5py.Dataset, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(ZarrGroup, np.ma.MaskedArray, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(ZarrGroup, ZarrArray, IOSpec("array", "0.2.0")) def write_basic( f: GroupStorageType, k: str, @@ -392,6 +389,48 @@ def write_basic( f.create_dataset(k, data=elem, **dataset_kwargs) +def _iter_chunks_for_copy(elem, dest): + """ + Returns an iterator of tuples of slices for copying chunks from `elem` to `dest`. + + * If `elem` has chunks, it will return the chunks of `elem`. + * If `dest` has chunks, it will return the chunks of `dest`. + * If neither is chunked, we write it in ~100MB chunks or 1000 rows, whichever is larger. + """ + if elem.chunks: + return elem.iter_chunks() + elif dest.chunks: + return dest.iter_chunks() + else: + itemsize = elem.dtype.itemsize + shape = elem.shape + entry_chunk_size = (100 * 1024 * 1024 // itemsize) # number of elements to write + n_rows = max(entry_chunk_size // shape[0], 1000) # Number of rows that works out to + return zip( + (slice(i, min(i + n_rows, shape[0])) for i in range(0, shape[0], n_rows)), + (slice(None) for _ in range(0, shape[0], n_rows)), + ) + + +@_REGISTRY.register_write(H5Group, H5Array, IOSpec("array", "0.2.0")) +@_REGISTRY.register_write(H5Group, ZarrArray, IOSpec("array", "0.2.0")) +@_REGISTRY.register_write(ZarrGroup, H5Array, IOSpec("array", "0.2.0")) +@_REGISTRY.register_write(ZarrGroup, ZarrArray, IOSpec("array", "0.2.0")) +def write_chunked_dense_array( + f: GroupStorageType, + k: str, + elem, + *, + _writer: Writer, + dataset_kwargs: Mapping[str, Any] = MappingProxyType({}), +): + dest = f.create_dataset_like(k, elem, **dataset_kwargs) + + for chunk in _iter_chunks_for_copy(elem, dest): + dest[chunk] = elem[chunk] + + + _REGISTRY.register_write(H5Group, CupyArray, IOSpec("array", "0.2.0"))( _to_cpu_mem_wrapper(write_basic) ) From 232bee467c2ad4d43b7ff2857852fbf52c466038 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 23:44:47 +0000 Subject: [PATCH 02/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/anndata/_io/specs/methods.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 477d79be8..4b735b4c1 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -404,8 +404,10 @@ def _iter_chunks_for_copy(elem, dest): else: itemsize = elem.dtype.itemsize shape = elem.shape - entry_chunk_size = (100 * 1024 * 1024 // itemsize) # number of elements to write - n_rows = max(entry_chunk_size // shape[0], 1000) # Number of rows that works out to + entry_chunk_size = 100 * 1024 * 1024 // itemsize # number of elements to write + n_rows = max( + entry_chunk_size // shape[0], 1000 + ) # Number of rows that works out to return zip( (slice(i, min(i + n_rows, shape[0])) for i in range(0, shape[0], n_rows)), (slice(None) for _ in range(0, shape[0], n_rows)), @@ -430,7 +432,6 @@ def write_chunked_dense_array( dest[chunk] = elem[chunk] - _REGISTRY.register_write(H5Group, CupyArray, IOSpec("array", "0.2.0"))( _to_cpu_mem_wrapper(write_basic) ) From c43c5e2dfb7ff911f6b04071a00cbb770952e3e2 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Wed, 28 Aug 2024 16:52:21 -0700 Subject: [PATCH 03/11] Make n-dimensional --- src/anndata/_io/specs/methods.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 4b735b4c1..ce8e46218 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -408,10 +408,7 @@ def _iter_chunks_for_copy(elem, dest): n_rows = max( entry_chunk_size // shape[0], 1000 ) # Number of rows that works out to - return zip( - (slice(i, min(i + n_rows, shape[0])) for i in range(0, shape[0], n_rows)), - (slice(None) for _ in range(0, shape[0], n_rows)), - ) + return (slice(i, min(i + n_rows, shape[0])) for i in range(0, shape[0], n_rows)) @_REGISTRY.register_write(H5Group, H5Array, IOSpec("array", "0.2.0")) From 749880bbc5dd353d22a0cac14dd6a28b56cecbcc Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Wed, 28 Aug 2024 17:23:11 -0700 Subject: [PATCH 04/11] Add some tests, which fail :( --- tests/test_io_elementwise.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_io_elementwise.py b/tests/test_io_elementwise.py index ed89dbe7f..a4d614c6f 100644 --- a/tests/test_io_elementwise.py +++ b/tests/test_io_elementwise.py @@ -166,6 +166,18 @@ def create_sparse_store( pytest.param( pd.array([True, False, True, True]), "nullable-boolean", id="pd_arr_bool" ), + pytest.param( + zarr.ones((100, 100), chunks=(10, 10)), + "array", + id="zarr_dense_array", + ), + pytest.param( + create_dense_store( + h5py.File("test1.h5", mode="w", driver="core", backing_store=False) + )["X"], + "array", + id="h5_dense_array", + ), # pytest.param(bytes, b"some bytes", "bytes", id="py_bytes"), # Does not work for zarr # TODO consider how specific encodings should be. Should we be fully describing the written type? # Currently the info we add is: "what you wouldn't be able to figure out yourself" From 32e008de85edda70fbf3140fb0fef3082efaad3d Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Mon, 30 Sep 2024 15:41:19 -0700 Subject: [PATCH 05/11] Fix up chunking algorithm + add some types --- src/anndata/_io/specs/methods.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index ce8e46218..2861949ab 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -377,6 +377,8 @@ def write_list( @_REGISTRY.register_write(ZarrGroup, views.ArrayView, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(ZarrGroup, np.ndarray, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(ZarrGroup, np.ma.MaskedArray, IOSpec("array", "0.2.0")) +@_REGISTRY.register_write(ZarrGroup, ZarrArray, IOSpec("array", "0.2.0")) +@_REGISTRY.register_write(ZarrGroup, H5Array, IOSpec("array", "0.2.0")) def write_basic( f: GroupStorageType, k: str, @@ -389,17 +391,14 @@ def write_basic( f.create_dataset(k, data=elem, **dataset_kwargs) -def _iter_chunks_for_copy(elem, dest): +def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): """ Returns an iterator of tuples of slices for copying chunks from `elem` to `dest`. - * If `elem` has chunks, it will return the chunks of `elem`. * If `dest` has chunks, it will return the chunks of `dest`. - * If neither is chunked, we write it in ~100MB chunks or 1000 rows, whichever is larger. + * If `dest` is not chunked, we write it in ~100MB chunks or 1000 rows, whichever is larger. """ - if elem.chunks: - return elem.iter_chunks() - elif dest.chunks: + if dest.chunks: return dest.iter_chunks() else: itemsize = elem.dtype.itemsize @@ -413,17 +412,21 @@ def _iter_chunks_for_copy(elem, dest): @_REGISTRY.register_write(H5Group, H5Array, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(H5Group, ZarrArray, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(ZarrGroup, H5Array, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(ZarrGroup, ZarrArray, IOSpec("array", "0.2.0")) -def write_chunked_dense_array( +def write_chunked_dense_array_to_h5( f: GroupStorageType, k: str, - elem, + elem: ArrayStorageType, *, _writer: Writer, dataset_kwargs: Mapping[str, Any] = MappingProxyType({}), ): - dest = f.create_dataset_like(k, elem, **dataset_kwargs) + """Write to a h5py.Dataset in chunks. + + `h5py.Group.create_dataset(..., data: h5py.Dataset)` will load all of `data` into memory + before writing. Instead, we will write in chunks to avoid this. We don't need to do this for + zarr since zarr handles this automatically. + """ + dest = f.create_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs) for chunk in _iter_chunks_for_copy(elem, dest): dest[chunk] = elem[chunk] From b2192a2148574b6cce4e0a49aef64d0a5e173fc0 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 15 Nov 2024 15:52:31 +0100 Subject: [PATCH 06/11] (chore): remove unneeded check? --- tests/test_io_dispatched.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_io_dispatched.py b/tests/test_io_dispatched.py index 833b23e83..4dba9b6aa 100644 --- a/tests/test_io_dispatched.py +++ b/tests/test_io_dispatched.py @@ -180,7 +180,5 @@ def zarr_reader(func, elem_name: str, elem, iospec): write_dispatched(f, "/", adata, callback=zarr_writer) _ = read_dispatched(f, zarr_reader) - assert h5ad_write_keys == zarr_write_keys - assert h5ad_read_keys == zarr_read_keys - - assert sorted(h5ad_write_keys) == sorted(h5ad_read_keys) + assert sorted(h5ad_write_keys) == sorted(zarr_write_keys) + assert sorted(h5ad_read_keys) == sorted(zarr_read_keys) From 5938d86f0130265fa126ff3fddaac7e99143e41e Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 15 Nov 2024 15:52:56 +0100 Subject: [PATCH 07/11] (fix): dispatch to chunked writing for dense arrays --- src/anndata/_io/specs/methods.py | 38 ++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 2861949ab..6379588d3 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -398,7 +398,7 @@ def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): * If `dest` has chunks, it will return the chunks of `dest`. * If `dest` is not chunked, we write it in ~100MB chunks or 1000 rows, whichever is larger. """ - if dest.chunks: + if dest.chunks and hasattr(dest, "iter_chunks"): return dest.iter_chunks() else: itemsize = elem.dtype.itemsize @@ -412,7 +412,8 @@ def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): @_REGISTRY.register_write(H5Group, H5Array, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(H5Group, ZarrArray, IOSpec("array", "0.2.0")) -def write_chunked_dense_array_to_h5( +@_REGISTRY.register_write(ZarrGroup, H5Array, IOSpec("array", "0.2.0")) +def write_chunked_dense_array_to_group( f: GroupStorageType, k: str, elem: ArrayStorageType, @@ -426,12 +427,31 @@ def write_chunked_dense_array_to_h5( before writing. Instead, we will write in chunks to avoid this. We don't need to do this for zarr since zarr handles this automatically. """ - dest = f.create_dataset(k, shape=elem.shape, dtype=elem.dtype, **dataset_kwargs) + dtype = dataset_kwargs.get("dtype", elem.dtype) + dest = f.create_dataset(k, shape=elem.shape, **dataset_kwargs, dtype=dtype) for chunk in _iter_chunks_for_copy(elem, dest): dest[chunk] = elem[chunk] +@_REGISTRY.register_write(ZarrGroup, ZarrArray, IOSpec("array", "0.2.0")) +def write_chunked_dense_array_to_zarr( + f: ZarrGroup, + k: str, + elem: ZarrArray, + *, + _writer: Writer, + dataset_kwargs: Mapping[str, Any] = MappingProxyType({}), +): + """Write to a h5py.Dataset in chunks. + `h5py.Group.create_dataset(..., data: h5py.Dataset)` will load all of `data` into memory + before writing. Instead, we will write in chunks to avoid this. We don't need to do this for + zarr since zarr handles this automatically. + """ + dtype = dataset_kwargs.get("dtype", elem.dtype) + f.create_dataset(k, shape=elem.shape, data=elem, **dataset_kwargs, dtype=dtype) + + _REGISTRY.register_write(H5Group, CupyArray, IOSpec("array", "0.2.0"))( _to_cpu_mem_wrapper(write_basic) ) @@ -638,10 +658,14 @@ def write_sparse_compressed( # Allow resizing for hdf5 if isinstance(f, H5Group) and "maxshape" not in dataset_kwargs: dataset_kwargs = dict(maxshape=(None,), **dataset_kwargs) - - g.create_dataset("data", data=value.data, **dataset_kwargs) - g.create_dataset("indices", data=value.indices, **dataset_kwargs) - g.create_dataset("indptr", data=value.indptr, dtype=indptr_dtype, **dataset_kwargs) + _writer.write_elem(g, "data", value.data, dataset_kwargs=dataset_kwargs) + _writer.write_elem(g, "indices", value.indices, dataset_kwargs=dataset_kwargs) + _writer.write_elem( + g, + "indptr", + value.indptr, + dataset_kwargs={"dtype": indptr_dtype, **dataset_kwargs}, + ) write_csr = partial(write_sparse_compressed, fmt="csr") From 99d4400bee9f0c75a4321ba9fa18c1ba3149c7d2 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 15 Nov 2024 16:03:23 +0100 Subject: [PATCH 08/11] (chore): remove unnecessary methods --- src/anndata/_io/specs/methods.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 6379588d3..f6916cba1 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -388,7 +388,8 @@ def write_basic( dataset_kwargs: Mapping[str, Any] = MappingProxyType({}), ): """Write methods which underlying library handles natively.""" - f.create_dataset(k, data=elem, **dataset_kwargs) + dtype = dataset_kwargs.get("dtype", elem.dtype) + f.create_dataset(k, data=elem, **dataset_kwargs, dtype=dtype) def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): @@ -412,7 +413,6 @@ def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): @_REGISTRY.register_write(H5Group, H5Array, IOSpec("array", "0.2.0")) @_REGISTRY.register_write(H5Group, ZarrArray, IOSpec("array", "0.2.0")) -@_REGISTRY.register_write(ZarrGroup, H5Array, IOSpec("array", "0.2.0")) def write_chunked_dense_array_to_group( f: GroupStorageType, k: str, @@ -434,24 +434,6 @@ def write_chunked_dense_array_to_group( dest[chunk] = elem[chunk] -@_REGISTRY.register_write(ZarrGroup, ZarrArray, IOSpec("array", "0.2.0")) -def write_chunked_dense_array_to_zarr( - f: ZarrGroup, - k: str, - elem: ZarrArray, - *, - _writer: Writer, - dataset_kwargs: Mapping[str, Any] = MappingProxyType({}), -): - """Write to a h5py.Dataset in chunks. - `h5py.Group.create_dataset(..., data: h5py.Dataset)` will load all of `data` into memory - before writing. Instead, we will write in chunks to avoid this. We don't need to do this for - zarr since zarr handles this automatically. - """ - dtype = dataset_kwargs.get("dtype", elem.dtype) - f.create_dataset(k, shape=elem.shape, data=elem, **dataset_kwargs, dtype=dtype) - - _REGISTRY.register_write(H5Group, CupyArray, IOSpec("array", "0.2.0"))( _to_cpu_mem_wrapper(write_basic) ) From 6ef459d02e3fe45a0945b95e8f46a0ac95c81294 Mon Sep 17 00:00:00 2001 From: "Philipp A." Date: Tue, 10 Dec 2024 15:26:55 +0100 Subject: [PATCH 09/11] fmt --- src/anndata/_io/specs/methods.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 19cd1f66f..1cd5f61c4 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -406,10 +406,10 @@ def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): else: itemsize = elem.dtype.itemsize shape = elem.shape - entry_chunk_size = 100 * 1024 * 1024 // itemsize # number of elements to write - n_rows = max( - entry_chunk_size // shape[0], 1000 - ) # Number of rows that works out to + # Number of elements to write + entry_chunk_size = 100 * 1024 * 1024 // itemsize + # Number of rows that works out to + n_rows = max(entry_chunk_size // shape[0], 1000) return (slice(i, min(i + n_rows, shape[0])) for i in range(0, shape[0], n_rows)) From 0ba0da26ddb3c6d5653648266d2aaf1193af28ec Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Tue, 10 Dec 2024 16:04:34 +0100 Subject: [PATCH 10/11] (fix): type --- src/anndata/_io/specs/methods.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index 7ef31b590..e6d0417ac 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -44,7 +44,7 @@ from .registry import _REGISTRY, IOSpec, read_elem, read_elem_partial if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Callable, Iterator from os import PathLike from typing import Any, Literal @@ -395,7 +395,9 @@ def write_basic( f.create_dataset(k, data=elem, **dataset_kwargs, dtype=dtype) -def _iter_chunks_for_copy(elem: ArrayStorageType, dest: ArrayStorageType): +def _iter_chunks_for_copy( + elem: ArrayStorageType, dest: ArrayStorageType +) -> Iterator[slice | tuple[list[slice]]]: """ Returns an iterator of tuples of slices for copying chunks from `elem` to `dest`. From b5c8d7d690acb9f22b482d34fee32f1d52200a22 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Tue, 10 Dec 2024 19:18:23 +0100 Subject: [PATCH 11/11] (fix): remove erroneous change + default dtype --- src/anndata/_io/specs/methods.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/anndata/_io/specs/methods.py b/src/anndata/_io/specs/methods.py index e6d0417ac..567a98638 100644 --- a/src/anndata/_io/specs/methods.py +++ b/src/anndata/_io/specs/methods.py @@ -391,8 +391,7 @@ def write_basic( dataset_kwargs: Mapping[str, Any] = MappingProxyType({}), ): """Write methods which underlying library handles natively.""" - dtype = dataset_kwargs.get("dtype", elem.dtype) - f.create_dataset(k, data=elem, **dataset_kwargs, dtype=dtype) + f.create_dataset(k, data=elem, **dataset_kwargs) def _iter_chunks_for_copy( @@ -433,7 +432,8 @@ def write_chunked_dense_array_to_group( zarr since zarr handles this automatically. """ dtype = dataset_kwargs.get("dtype", elem.dtype) - dest = f.create_dataset(k, shape=elem.shape, **dataset_kwargs, dtype=dtype) + kwargs = {**dataset_kwargs, "dtype": dtype} + dest = f.create_dataset(k, shape=elem.shape, **kwargs) for chunk in _iter_chunks_for_copy(elem, dest): dest[chunk] = elem[chunk]