Skip to content

Commit

Permalink
Refactor Export Rechunking (#478)
Browse files Browse the repository at this point in the history
* refactor export rechunker to work with latest dask

* make accessor works with new dask/zarr chunk scheme

* remove print statement

* removed error description from docstring
  • Loading branch information
tasansal authored Dec 13, 2024
1 parent 373180f commit 2e422e8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 26 deletions.
12 changes: 8 additions & 4 deletions src/mdio/api/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import logging

import dask.array as da
import numpy as np
import numpy.typing as npt
Expand All @@ -17,6 +19,9 @@
from mdio.exceptions import ShapeError


logger = logging.getLogger(__name__)


class MDIOAccessor:
"""Accessor class for MDIO files.
Expand Down Expand Up @@ -243,7 +248,6 @@ def _set_attributes(self):
for idx, dim in enumerate(new_chunks)
)

print(f"Array shape is {self.shape}")
self._orig_chunks = self.chunks
self.chunks = new_chunks

Expand All @@ -263,9 +267,9 @@ def _open_arrays(self):
self._traces = self._array_loader(**trace_kwargs)

if self._backend == "dask" and self._orig_chunks != self._chunks:
dask_chunksize = self._traces.chunksize
print(f"Setting (dask) chunks from {self._orig_chunks} to {dask_chunksize}")
self.chunks = dask_chunksize
dask_chunks = self._traces.chunks
logger.info(f"Setting MDIO in-memory chunks to {dask_chunks}")
self.chunks = dask_chunks

header_kwargs = dict(
group_handle=self._metadata_group,
Expand Down
31 changes: 9 additions & 22 deletions src/mdio/segy/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np
from dask.array.core import auto_chunks
from dask.array.core import normalize_chunks

from mdio.core import Dimension
from mdio.segy.geometry import GridOverrider
Expand All @@ -18,6 +19,9 @@
from segy.arrays import HeaderArray


logger = logging.getLogger(__name__)


def get_grid_plan( # noqa: C901
segy_file: SegyFile,
chunksize: list[int],
Expand Down Expand Up @@ -87,7 +91,7 @@ def segy_export_rechunker(
shape: tuple[int, ...],
dtype: DTypeLike,
limit: str = "300M",
) -> tuple[int, ...]:
) -> tuple[tuple[int, ...], ...]:
"""Determine chunk sizes for writing out SEG-Y given limit.
This module finds the desired chunk sizes for given chunk size
Expand All @@ -111,19 +115,17 @@ def segy_export_rechunker(
Returns:
Adjusted chunk sizes for further processing
Raises:
ValueError: If resulting chunks will split file on disk.
"""
ndim = len(shape) - 1 # minus the sample axis

# set sample chunks to max
prev_chunks = chunks[:-1] + (shape[-1],)

new_chunks = tuple()
for idx in range(ndim, -1, -1):
tmp_chunks = prev_chunks[:idx] + ("auto",) + prev_chunks[idx + 1 :]

new_chunks = auto_chunks(
new_chunks = normalize_chunks(
chunks=tmp_chunks,
shape=shape,
limit=limit,
Expand All @@ -132,22 +134,7 @@ def segy_export_rechunker(
)

# Ensure it is integers
new_chunks = tuple(map(int, new_chunks))
prev_chunks = new_chunks

# TODO: Add strict=True and remove noqa when minimum Python is 3.10
qc_iterator = zip(new_chunks, chunks, shape) # noqa: B905

for idx, (dim_new_chunk, dim_chunk, dim_size) in enumerate(qc_iterator):
# Sometimes dim_chunk can be larger than dim_size. This catches when
# that is False and the new chunk will be smaller than original
if dim_new_chunk < dim_chunk < dim_size:
msg = (
f"Dimension {idx} chunk size in {new_chunks=} is smaller than "
f"the disk {chunks=} with given {limit=}. This will cause very "
f"poor performance due to redundant reads. Please increase limit "
f"to get larger chunks. However, this may require more memory."
)
raise ValueError(msg)

logger.debug(f"Auto export rechunking to: {new_chunks}")
return new_chunks

0 comments on commit 2e422e8

Please sign in to comment.