Skip to content

Commit

Permalink
Merge branch 'master' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
dachengx authored Feb 11, 2024
2 parents 94d9dfd + 705e20f commit 5e4de51
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 39 deletions.
2 changes: 1 addition & 1 deletion extra_requirements/requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# File for the requirements of strax with the automated tests
git+https://github.com/XENONnT/ax_env
git+https://github.com/XENONnT/base_environment
deepdiff
ipython==8.12.1
24 changes: 0 additions & 24 deletions strax/chunk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import typing as ty
from functools import wraps

import numpy as np
import numba
Expand Down Expand Up @@ -419,26 +418,3 @@ def _update_subruns_in_chunk(chunks):
else:
subruns[subrun_id] = subrun_start_end
return subruns


@export
def check_chunk_n(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
# assume chunk_info is the second argument
if "chunk_info" not in kwargs:
raise ValueError(
"chunk_info not passed to function, check_chunk_n ",
"can only be used with functions that take chunk_info as an argument, ",
"usually it is the strax.StorageBackend._read_chunk method.",
)
chunk_info = kwargs["chunk_info"]
chunk = f(self, *args, **kwargs)
if len(chunk) != chunk_info["n"]:
raise strax.DataCorrupted(
f"Chunk {chunk_info['filename']} of {chunk_info['run_id']} has {len(chunk)} items, "
f"but chunk_info {chunk_info} says {chunk_info['n']}"
)
return chunk

return wrapper
27 changes: 21 additions & 6 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2085,7 +2085,7 @@ def copy_to_frontend(
target_sf = [self.storage[target_frontend_id]]

# Figure out which of the frontends has the data. Raise error when none
source_sf = self._get_source_sf(run_id, target, should_exist=True)
source_sf = self.get_source_sf(run_id, target, should_exist=True)[0]

# Keep frontends that:
# 1. don't already have the data; and
Expand Down Expand Up @@ -2275,22 +2275,37 @@ def _is_stored_in_sf(self, run_id, target, storage_frontend: strax.StorageFronte
except strax.DataNotAvailable:
return False

def _get_source_sf(self, run_id, target, should_exist=False):
"""Get the source storage frontend for a given run_id and target.
def get_source_sf(self, run_id, target, should_exist=False):
"""Get the source storage frontends for a given run_id and target.
:param run_id, target: run_id, target
:param should_exist: Raise a ValueError if we cannot find one (e.g. we already checked the
data is stored)
:return: strax.StorageFrontend or None (when raise_error is False)
:return: list of strax.StorageFrontend (when should_exist is False)
"""
if isinstance(target, (tuple, list)):
if len(target) == 0:
raise ValueError("Cannot find stored frontend for empty target!")
frontends_list = [
self.get_source_sf(
run_id,
t,
should_exist=should_exist,
)
for t in target
]
return list(set.intersection(*map(set, frontends_list)))

frontends = []
for sf in self._sorted_storage:
if self._is_stored_in_sf(run_id, target, sf):
return sf
if should_exist:
frontends.append(sf)
if should_exist and not frontends:
raise ValueError(
"This cannot happen, we just checked that this run should be stored?!?"
)
return frontends

def get_save_when(self, target: str) -> ty.Union[strax.SaveWhen, int]:
"""For a given plugin, get the save when attribute either being a dict or a number."""
Expand Down
13 changes: 6 additions & 7 deletions strax/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,6 @@ class StorageBackend:
"""

def __new__(cls, *args, **kwargs):
"""Mandatorily wrap _read_chunk in a check_chunk_n decorator."""
if "_read_chunk" in cls.__dict__:
method = getattr(cls, "_read_chunk")
setattr(cls, "_read_chunk", strax.check_chunk_n(method))
return super(StorageBackend, cls).__new__(cls)

def loader(self, backend_key, time_range=None, chunk_number=None, executor=None):
"""Iterates over strax data in backend_key.
Expand Down Expand Up @@ -533,6 +526,12 @@ def _read_and_format_chunk(
backend_key, chunk_info=chunk_info, dtype=dtype, compressor=metadata["compressor"]
)

if len(data) != chunk_info["n"]:
raise strax.DataCorrupted(
f"Chunk {chunk_info['filename']} of {chunk_info['run_id']} has {len(data)} items, "
f"but chunk_info {chunk_info} says {chunk_info['n']}"
)

_is_superrun = chunk_info["run_id"].startswith("_")
subruns = None
if _is_superrun:
Expand Down
2 changes: 2 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ def test_copy_to_frontend():

# Add the second frontend
context.storage += [strax.DataDirectory(temp_dir_2)]
# Test get_source_sf
context.get_source_sf(run_id, ["records", "records"])
context.copy_to_frontend(run_id, "records", target_compressor="lz4")

# Make sure both frontends have the same data.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def test_close_goes_first_on_loading(self):
# self.assertNotEqual(len_from_compare, len_from_main_st)

def test_check_chunk_n(self):
"""Check that check_chunk_n can detect when metadata is lying."""
"""Check that StorageBackend detects when metadata is lying."""
st, frontend_setup = self.get_st_and_fill_frontends()

sf = st.storage[0]
Expand Down

0 comments on commit 5e4de51

Please sign in to comment.