Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to set chunksize when making the dask array #232

Merged
merged 7 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
apt:
- graphviz
- linux: py311-devdeps
- linux: py38-oldestdeps
- linux: py39-oldestdeps
publish:
needs: [tests]
Expand Down
1 change: 1 addition & 0 deletions changelog/232.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Drop support for Python 3.8 in line with `NEP 29 <https://numpy.org/neps/nep-0029-deprecation_policy.html>_`.
2 changes: 2 additions & 0 deletions changelog/232.trivial.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Internal improvements to how the data are loaded from the collection of FITS files.
This should have no user facing effects, but provides a foundation for future performance work.
5 changes: 3 additions & 2 deletions dkist/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

@pytest.fixture
def array():
shape = np.random.randint(10, 100, size=2)
x = np.ones(shape) + 10
shape = 2**np.random.randint(2, 7, size=2)
x = np.ones(np.prod(shape)) + 10
x = x.reshape(shape)
return da.from_array(x, tuple(shape))


Expand Down
7 changes: 1 addition & 6 deletions dkist/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import sys
import importlib.resources as importlib_resources
from pathlib import Path
from textwrap import dedent

if sys.version_info < (3, 9):
import importlib_resources
else:
import importlib.resources as importlib_resources

from jsonschema.exceptions import ValidationError

import asdf
Expand Down
5 changes: 4 additions & 1 deletion dkist/io/asdf/converters/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ def from_yaml_tree(self, node, tag, ctx):
node["target"],
node["datatype"],
node["shape"],
chunksize=node.get("chunksize", None),
loader=AstropyFITSLoader,
basepath=base_path)
return file_manager

def to_yaml_tree(self, obj, tag, ctx):
node = {}
node["fileuris"] = obj._striped_external_array._fileuris.tolist()
node["fileuris"] = obj._striped_external_array.fileuri_array.tolist()
node["target"] = obj._striped_external_array.target
node["datatype"] = obj._striped_external_array.dtype
node["shape"] = obj._striped_external_array.shape
if chunksize := obj._striped_external_array.chunksize is not None:
node["chunksize"] = chunksize
return node
8 changes: 1 addition & 7 deletions dkist/io/asdf/entry_points.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
This file contains the entry points for asdf.
"""
import sys
import importlib.resources as importlib_resources

from asdf.extension import ManifestExtension
from asdf.resource import DirectoryResourceMapping
Expand All @@ -10,12 +10,6 @@
FileManagerConverter, RavelConverter, TiledDatasetConverter,
VaryingCelestialConverter)

if sys.version_info < (3, 9):
import importlib_resources
else:
import importlib.resources as importlib_resources



def get_resource_mappings():
"""
Expand Down
2 changes: 2 additions & 0 deletions dkist/io/asdf/resources/schemas/file_manager-1.0.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ properties:
anyOf:
- type: integer
minimum: 0
chunksize:
type: array

required: [fileuris, target, datatype, shape]
additionalProperties: false
Expand Down
47 changes: 39 additions & 8 deletions dkist/io/asdf/tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import sys
import importlib.resources as importlib_resources
from pathlib import Path

if sys.version_info < (3, 9):
import importlib_resources
else:
import importlib.resources as importlib_resources

import numpy as np
import pytest

Expand Down Expand Up @@ -49,8 +44,8 @@ def assert_dataset_equal(new, old):
new.meta["headers"] = new_headers
assert old.wcs.name == new.wcs.name
assert len(old.wcs.available_frames) == len(new.wcs.available_frames)
ac_new = new.files.external_array_references
ac_old = old.files.external_array_references
ac_new = new.files.fileuri_array
ac_old = old.files.fileuri_array
assert ac_new == ac_old
assert old.unit == new.unit
assert old.mask == new.mask
Expand Down Expand Up @@ -140,3 +135,39 @@ def test_read_all_schema_versions(eit_dataset_asdf_path):
assert isinstance(dataset.wcs, gwcs.WCS)
assert dataset.wcs.world_n_dim == 3
assert dataset.wcs.pixel_n_dim == 3


@pytest.fixture
def wrap_object(mocker):

def wrap_object(target, attribute):
mock = mocker.MagicMock()
real_attribute = getattr(target, attribute)

def mocked_attribute(self, *args, **kwargs):
mock.__call__(*args, **kwargs)
return real_attribute(self, *args, **kwargs)

mocker.patch.object(target, attribute, mocked_attribute)

return mock

return wrap_object


def test_loader_getitem_with_chunksize(eit_dataset_asdf_path, wrap_object):
# Import this here to prevent hitting https://bugs.python.org/issue35753 on Python <3.10
# Importing call is enough to trigger a doctest error
from unittest.mock import call

chunksize = (32, 16)
with asdf.open(eit_dataset_asdf_path) as tree:
dataset = tree["dataset"]
dataset.files.basepath = rootdir / "EIT"
dataset.files._striped_external_array.chunksize = chunksize
mocked = wrap_object(dataset.files._striped_external_array._loader, "__getitem__")
dataset._data = dataset.files._generate_array()
dataset.data.compute()

expected_call = call((slice(0, chunksize[0], None), slice(0, chunksize[1], None)))
assert expected_call in mocked.mock_calls
7 changes: 1 addition & 6 deletions dkist/io/asdf/tests/test_tiled_dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import sys

if sys.version_info < (3, 9):
import importlib_resources
else:
import importlib.resources as importlib_resources
import importlib.resources as importlib_resources

import asdf

Expand Down
13 changes: 8 additions & 5 deletions dkist/io/dask_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__all__ = ['stack_loader_array']


def stack_loader_array(loader_array):
def stack_loader_array(loader_array, chunksize):
"""
Stack a loader array along each of its dimensions.
Expand All @@ -20,15 +20,18 @@ def stack_loader_array(loader_array):
-------
array : `dask.array.Array`
"""
# If the chunksize sin't specified then use the whole array shape
chunksize = chunksize or loader_array.flat[0].shape

if len(loader_array.shape) == 1:
return da.stack(loader_to_dask(loader_array))
return da.stack(loader_to_dask(loader_array, chunksize))
stacks = []
for i in range(loader_array.shape[0]):
stacks.append(stack_loader_array(loader_array[i]))
stacks.append(stack_loader_array(loader_array[i], chunksize))
return da.stack(stacks)


def loader_to_dask(loader_array):
def loader_to_dask(loader_array, chunksize):
"""
Map a call to `dask.array.from_array` onto all the elements in ``loader_array``.
Expand All @@ -44,6 +47,6 @@ def loader_to_dask(loader_array):
# trying to auto calculate it by reading from the actual array on disk.
meta = np.zeros((0,), dtype=loader_array[0].dtype)

to_array = partial(da.from_array, meta=meta)
to_array = partial(da.from_array, meta=meta, chunks=chunksize)

return map(to_array, loader_array)
Loading