diff --git a/extra_requirements/requirements-tests.txt b/extra_requirements/requirements-tests.txt index a5a62be30..bbdf3e337 100644 --- a/extra_requirements/requirements-tests.txt +++ b/extra_requirements/requirements-tests.txt @@ -1,4 +1,4 @@ # File for the requirements of strax with the automated tests -git+https://github.com/XENONnT/ax_env +git+https://github.com/XENONnT/base_environment deepdiff ipython==8.12.1 diff --git a/strax/chunk.py b/strax/chunk.py index 6f027463a..f1e9f67c5 100644 --- a/strax/chunk.py +++ b/strax/chunk.py @@ -1,5 +1,4 @@ import typing as ty -from functools import wraps import numpy as np import numba @@ -419,26 +418,3 @@ def _update_subruns_in_chunk(chunks): else: subruns[subrun_id] = subrun_start_end return subruns - - -@export -def check_chunk_n(f): - @wraps(f) - def wrapper(self, *args, **kwargs): - # assume chunk_info is the second argument - if "chunk_info" not in kwargs: - raise ValueError( - "chunk_info not passed to function, check_chunk_n ", - "can only be used with functions that take chunk_info as an argument, ", - "usually it is the strax.StorageBackend._read_chunk method.", - ) - chunk_info = kwargs["chunk_info"] - chunk = f(self, *args, **kwargs) - if len(chunk) != chunk_info["n"]: - raise strax.DataCorrupted( - f"Chunk {chunk_info['filename']} of {chunk_info['run_id']} has {len(chunk)} items, " - f"but chunk_info {chunk_info} says {chunk_info['n']}" - ) - return chunk - - return wrapper diff --git a/strax/context.py b/strax/context.py index 95c01f62f..8665847ac 100644 --- a/strax/context.py +++ b/strax/context.py @@ -2085,7 +2085,7 @@ def copy_to_frontend( target_sf = [self.storage[target_frontend_id]] # Figure out which of the frontends has the data. Raise error when none - source_sf = self._get_source_sf(run_id, target, should_exist=True) + source_sf = self.get_source_sf(run_id, target, should_exist=True)[0] # Keep frontends that: # 1. don't already have the data; and @@ -2275,22 +2275,37 @@ def _is_stored_in_sf(self, run_id, target, storage_frontend: strax.StorageFronte except strax.DataNotAvailable: return False - def _get_source_sf(self, run_id, target, should_exist=False): - """Get the source storage frontend for a given run_id and target. + def get_source_sf(self, run_id, target, should_exist=False): + """Get the source storage frontends for a given run_id and target. :param run_id, target: run_id, target :param should_exist: Raise a ValueError if we cannot find one (e.g. we already checked the data is stored) - :return: strax.StorageFrontend or None (when raise_error is False) + :return: list of strax.StorageFrontend (when should_exist is False) """ + if isinstance(target, (tuple, list)): + if len(target) == 0: + raise ValueError("Cannot find stored frontend for empty target!") + frontends_list = [ + self.get_source_sf( + run_id, + t, + should_exist=should_exist, + ) + for t in target + ] + return list(set.intersection(*map(set, frontends_list))) + + frontends = [] for sf in self._sorted_storage: if self._is_stored_in_sf(run_id, target, sf): - return sf - if should_exist: + frontends.append(sf) + if should_exist and not frontends: raise ValueError( "This cannot happen, we just checked that this run should be stored?!?" ) + return frontends def get_save_when(self, target: str) -> ty.Union[strax.SaveWhen, int]: """For a given plugin, get the save when attribute either being a dict or a number.""" diff --git a/strax/storage/common.py b/strax/storage/common.py index 3e3be50aa..a9cf53742 100644 --- a/strax/storage/common.py +++ b/strax/storage/common.py @@ -429,13 +429,6 @@ class StorageBackend: """ - def __new__(cls, *args, **kwargs): - """Mandatorily wrap _read_chunk in a check_chunk_n decorator.""" - if "_read_chunk" in cls.__dict__: - method = getattr(cls, "_read_chunk") - setattr(cls, "_read_chunk", strax.check_chunk_n(method)) - return super(StorageBackend, cls).__new__(cls) - def loader(self, backend_key, time_range=None, chunk_number=None, executor=None): """Iterates over strax data in backend_key. @@ -533,6 +526,12 @@ def _read_and_format_chunk( backend_key, chunk_info=chunk_info, dtype=dtype, compressor=metadata["compressor"] ) + if len(data) != chunk_info["n"]: + raise strax.DataCorrupted( + f"Chunk {chunk_info['filename']} of {chunk_info['run_id']} has {len(data)} items, " + f"but chunk_info {chunk_info} says {chunk_info['n']}" + ) + _is_superrun = chunk_info["run_id"].startswith("_") subruns = None if _is_superrun: diff --git a/tests/test_context.py b/tests/test_context.py index 73281a7cf..de254b04f 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -178,6 +178,8 @@ def test_copy_to_frontend(): # Add the second frontend context.storage += [strax.DataDirectory(temp_dir_2)] + # Test get_source_sf + context.get_source_sf(run_id, ["records", "records"]) context.copy_to_frontend(run_id, "records", target_compressor="lz4") # Make sure both frontends have the same data. diff --git a/tests/test_storage.py b/tests/test_storage.py index 9fcc96a2c..437578f2d 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -165,7 +165,7 @@ def test_close_goes_first_on_loading(self): # self.assertNotEqual(len_from_compare, len_from_main_st) def test_check_chunk_n(self): - """Check that check_chunk_n can detect when metadata is lying.""" + """Check that StorageBackend detects when metadata is lying.""" st, frontend_setup = self.get_st_and_fill_frontends() sf = st.storage[0]