Skip to content

Commit

Permalink
Add option to delete heavy chunk automatically after reading (#1570)
Browse files Browse the repository at this point in the history
* Add option to delete heavy chunk automatically after reading
  • Loading branch information
dachengx authored Feb 27, 2025
1 parent 4e2c6cc commit 0f64e72
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
3 changes: 3 additions & 0 deletions straxen/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def xenonnt(
include_rucio_local: bool = False,
# Frontend options
download_heavy: bool = False,
remove_heavy: bool = False,
_auto_append_rucio_local: bool = True,
_rucio_path: str = "/dali/lgrandi/rucio/",
_rucio_local_path: Optional[str] = None,
Expand Down Expand Up @@ -136,6 +137,7 @@ def xenonnt(
wants to do a fuzzy search in the data the runs database is out of sync with rucio
:param download_heavy: bool, whether or not to allow downloads of heavy data (raw_records*, less
the aqmon)
:param remove_heavy: bool, whether or not to remove the heavy data after reading
:param _auto_append_rucio_local: bool, whether or not to automatically append the rucio local
path
:param _rucio_path: str, path of rucio
Expand Down Expand Up @@ -204,6 +206,7 @@ def xenonnt(
rucio_frontend = straxen.RucioRemoteFrontend(
staging_dir=os.path.join(output_folder, "rucio"),
download_heavy=download_heavy,
remove_heavy=remove_heavy,
)
st.storage += [rucio_frontend]

Expand Down
18 changes: 16 additions & 2 deletions straxen/storage/rucio_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
staging_dir="./strax_data",
rses_only=tuple(),
download_heavy=False,
remove_heavy=False,
tries=3,
num_threads=1,
stage=False,
Expand All @@ -46,6 +47,7 @@ def __init__(
):
"""
:param download_heavy: option to allow downloading of heavy data through RucioRemoteBackend
:param remove_heavy: option to remove heavy data from the RucioRemoteBackend after reading
:param args: Passed to strax.StorageFrontend
:param kwargs: Passed to strax.StorageFrontend
:param rses_only: tuple, limits RSE selection to these options if provided
Expand All @@ -61,6 +63,7 @@ def __init__(
staging_dir=staging_dir,
rses_only=rses_only,
download_heavy=download_heavy,
remove_heavy=remove_heavy,
tries=tries,
num_threads=num_threads,
stage=stage,
Expand Down Expand Up @@ -115,6 +118,7 @@ def __init__(
staging_dir,
rses_only=tuple(),
download_heavy=False,
remove_heavy=False,
tries=3,
num_threads=1,
stage=False,
Expand All @@ -125,6 +129,7 @@ def __init__(
a writable location.
:param download_heavy: Whether or not to allow downloads of the
heaviest data (raw_records*, less aqmon and MV)
:param remove_heavy: Whether or not to remove the heaviest data after reading
:param kwargs: Passed to strax.FileSystemBackend
:param rses_only: tuple, limits RSE selection to these options if provided
"""
Expand All @@ -144,6 +149,7 @@ def __init__(
self.staging_dir = staging_dir
self.rses_only = strax.to_str_tuple(rses_only)
self.download_heavy = download_heavy
self.remove_heavy = remove_heavy
self.tries = tries
self.num_threads = num_threads
self.stage = stage
Expand Down Expand Up @@ -193,8 +199,8 @@ def _read_chunk(self, dset_did, chunk_info, dtype, compressor):
base_dir = os.path.join(self.staging_dir, did_to_dirname(dset_did))
chunk_file = chunk_info["filename"]
chunk_path = os.path.abspath(os.path.join(base_dir, chunk_file))
number, datatype, hsh = parse_rucio_did(dset_did)
if not os.path.exists(chunk_path):
number, datatype, hsh = parse_rucio_did(dset_did)
if datatype in self.heavy_types and not self.download_heavy:
error_msg = (
"For space reasons we don't want to have everyone "
Expand Down Expand Up @@ -227,7 +233,15 @@ def _read_chunk(self, dset_did, chunk_info, dtype, compressor):
if not os.path.exists(chunk_path):
raise FileNotFoundError(f"No chunk file found at {chunk_path}")

return strax.load_file(chunk_path, dtype=dtype, compressor=compressor)
data = strax.load_file(chunk_path, dtype=dtype, compressor=compressor)

if self.remove_heavy and datatype in self.heavy_types:
warn(
f"Removing {chunk_path} after reading since it's heavy data. "
"This is a one-time operation."
)
os.remove(chunk_path)
return data

def _saver(self, dirname, metadata, **kwargs):
raise NotImplementedError(
Expand Down
21 changes: 17 additions & 4 deletions tests/storage/test_rucio_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ def setUp(self) -> None:
self.run_id = "009104"
self.staging_dir = "./test_rucio_remote"

def get_context(self, download_heavy: bool) -> strax.Context:
def get_context(self, download_heavy: bool, remove_heavy: bool) -> strax.Context:
os.makedirs(self.staging_dir, exist_ok=True)
context = straxen.contexts.xenonnt(
output_folder=os.path.join(self.staging_dir, "output"),
include_rucio_remote=True,
download_heavy=download_heavy,
remove_heavy=remove_heavy,
_rucio_path=self.staging_dir,
_raw_paths=[os.path.join(self.staging_dir, "raw")],
_database_init=False,
Expand All @@ -31,20 +32,32 @@ def tearDown(self):

@unittest.skipIf(not straxen.HAVE_ADMIX, "Admix is not installed")
def test_download_no_heavy(self):
st = self.get_context(download_heavy=False)
st = self.get_context(download_heavy=False, remove_heavy=False)
with self.assertRaises(strax.DataNotAvailable):
rr = self.try_load(st, "raw_records")
assert False, len(rr)

@unittest.skipIf(not straxen.HAVE_ADMIX, "Admix is not installed")
def test_download_with_heavy(self):
st = self.get_context(download_heavy=True)
st = self.get_context(download_heavy=True, remove_heavy=False)
rr = self.try_load(st, "raw_records")
assert len(rr)

@unittest.skipIf(not straxen.HAVE_ADMIX, "Admix is not installed")
def test_download_with_remove(self):
st = self.get_context(download_heavy=True, remove_heavy=True)
rr = self.try_load(st, "raw_records")
assert len(rr)
# The is_stored is still loadable after removing the chunks
assert st.is_stored(self.run_id, "raw_records")
# Make sure the only remaining file is metadata
folder = os.path.join(self.staging_dir, "output", "rucio")
key_for = str(st.key_for(self.run_id, "raw_records"))
assert len(os.listdir(os.path.join(folder, key_for))) == 1

@unittest.skipIf(not straxen.HAVE_ADMIX, "Admix is not installed")
def test_download_with_heavy_and_high_level(self):
st = self.get_context(download_heavy=True)
st = self.get_context(download_heavy=True, remove_heavy=False)
pc = self.try_load(st, "pulse_counts")
assert len(pc)

Expand Down

0 comments on commit 0f64e72

Please sign in to comment.