From eddec8384fb48464a193c42e58d518bca07070ff Mon Sep 17 00:00:00 2001 From: Dacheng Xu Date: Fri, 16 Aug 2024 00:41:59 +0800 Subject: [PATCH 1/3] Save run metadata in better format (#868) --- strax/storage/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/storage/files.py b/strax/storage/files.py index e48f0794..45eece3b 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -62,7 +62,7 @@ def write_run_metadata(self, run_id, metadata): with open(self._run_meta_path(run_id), mode="w") as f: if "name" not in metadata: metadata["name"] = run_id - f.write(json.dumps(metadata, default=json_util.default)) + f.write(json.dumps(metadata, sort_keys=True, indent=4, default=json_util.default)) def _scan_runs(self, store_fields): """Iterable of run document dictionaries. From 81f4250f1258d2305cb8e9e1541954e65d5eb3db Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Aug 2024 01:22:21 +0800 Subject: [PATCH 2/3] [pre-commit.ci] pre-commit autoupdate (#864) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/psf/black: 24.4.2 → 24.8.0](https://github.com/psf/black/compare/24.4.2...24.8.0) - [github.com/pre-commit/mirrors-mypy: v1.11.0 → v1.11.1](https://github.com/pre-commit/mirrors-mypy/compare/v1.11.0...v1.11.1) - [github.com/pycqa/flake8: 7.1.0 → 7.1.1](https://github.com/pycqa/flake8/compare/7.1.0...7.1.1) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Dacheng Xu --- .pre-commit-config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ae8d60f7..19675c57 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: - id: check-added-large-files - repo: https://github.com/psf/black - rev: 24.4.2 + rev: 24.8.0 hooks: - id: black args: [--safe, --line-length=100, --preview] @@ -24,7 +24,7 @@ repos: - id: docformatter - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.11.0 + rev: v1.11.1 hooks: - id: mypy additional_dependencies: [ @@ -33,7 +33,7 @@ repos: ] - repo: https://github.com/pycqa/flake8 - rev: 7.1.0 + rev: 7.1.1 hooks: - id: flake8 From 6f1564559201c487b7c03acb0d56359644caba62 Mon Sep 17 00:00:00 2001 From: Dacheng Xu Date: Fri, 16 Aug 2024 11:52:12 +0800 Subject: [PATCH 3/3] Include `chunk_number` in lineage: Per chunk storage (#863) * Include `chunk_number` in lineage * Debug for assigning 0 to chunk_number * Merge per chunk processed storages * Limit `chunk_number` to be list * Add more check to chunk_number --- strax/context.py | 287 +++++++++++++++++++++++++++++++---------- strax/run_selection.py | 2 +- 2 files changed, 221 insertions(+), 68 deletions(-) diff --git a/strax/context.py b/strax/context.py index a0a1a39b..c6497d06 100644 --- a/strax/context.py +++ b/strax/context.py @@ -1,20 +1,21 @@ -import datetime +import time import logging import warnings import fnmatch +import itertools +import json from functools import partial +import types import typing as ty -import time -import json -import numpy as np -import pandas as pd -import strax +from enum import IntEnum +import datetime import inspect -import types from collections import defaultdict -from immutabledict import immutabledict -from enum import IntEnum +from immutabledict import immutabledict +import numpy as np +import pandas as pd +import strax from strax import CutList @@ -600,10 +601,10 @@ def show_config(self, data_type=None, pattern="*", run_id="9" * 20): return pd.DataFrame(r, columns=r[0].keys()) return pd.DataFrame([]) - def lineage(self, run_id, data_type): + def lineage(self, run_id, data_type, chunk_number=None): """Return lineage dictionary for data_type and run_id, based on the options in this context.""" - return self._get_plugins((data_type,), run_id)[data_type].lineage + return self._get_plugins((data_type,), run_id, chunk_number=chunk_number)[data_type].lineage def register_all(self, module): """Register all plugins defined in module. @@ -659,13 +660,13 @@ def data_info(self, data_name: str) -> pd.DataFrame: result.append([name, dtype, title]) return pd.DataFrame(result, columns=display_headers) - def get_single_plugin(self, run_id, data_name): + def get_single_plugin(self, run_id, data_name, chunk_number=None): """Return a single fully initialized plugin that produces data_name for run_id. For use in custom processing. """ - plugin = self._get_plugins((data_name,), run_id)[data_name] + plugin = self._get_plugins((data_name,), run_id, chunk_number=chunk_number)[data_name] self._set_plugin_config(plugin, run_id, tolerant=False) plugin.setup() return plugin @@ -739,9 +740,14 @@ def _context_hash(self): def _plugins_are_cached( self, targets: ty.Union[ty.Tuple[str], ty.List[str]], + chunk_number: ty.Optional[ty.Dict[str, ty.List[int]]] = None, ) -> bool: """Check if all the requested targets are in the _fixed_plugin_cache.""" - if self.context_config["use_per_run_defaults"] or self._fixed_plugin_cache is None: + if ( + self.context_config["use_per_run_defaults"] + or self._fixed_plugin_cache is None + or chunk_number is not None + ): # There is no point in caching if plugins (lineage) can # change per run or the cache is empty. return False @@ -752,8 +758,12 @@ def _plugins_are_cached( plugin_cache = self._fixed_plugin_cache[context_hash] return all([t in plugin_cache for t in targets]) - def _plugins_to_cache(self, plugins: dict) -> None: - if self.context_config["use_per_run_defaults"]: + def _plugins_to_cache( + self, + plugins: dict, + chunk_number: ty.Optional[ty.Dict[str, ty.List[int]]] = None, + ) -> None: + if self.context_config["use_per_run_defaults"] or chunk_number is not None: # There is no point in caching if plugins (lineage) can change per run return context_hash = self._context_hash() @@ -773,8 +783,12 @@ def __get_requested_plugins_from_cache( run_id: str, targets: ty.Tuple[str], ) -> ty.Dict[str, strax.Plugin]: - # Doubly underscored since we don't do any key-checks etc here - """Load requested plugins from the plugin_cache.""" + """Load requested plugins from the plugin_cache. + + Doubly underscored since we don't do any key-checks etc here. Please be very careful of + using it since no check is done. + + """ requested_plugins = {} cached_plugins = self._fixed_plugin_cache[self._context_hash()] # type: ignore for target, plugin in cached_plugins.items(): @@ -801,6 +815,7 @@ def _get_plugins( self, targets: ty.Union[ty.Tuple[str], ty.List[str]], run_id: str, + chunk_number: ty.Optional[ty.Dict[str, ty.List[int]]] = None, ) -> ty.Dict[str, strax.Plugin]: """Return dictionary of plugin instances necessary to compute targets from scratch. @@ -827,7 +842,7 @@ def _get_plugins( if target in plugins: continue - target_plugin = self.__get_plugin(run_id, target) + target_plugin = self.__get_plugin(run_id, target, chunk_number=chunk_number) for provides in target_plugin.provides: plugins[provides] = target_plugin targets += list(target_plugin.depends_on) @@ -841,10 +856,15 @@ def _get_plugins( return plugins - def __get_plugin(self, run_id: str, data_type: str): + def __get_plugin( + self, + run_id: str, + data_type: str, + chunk_number: ty.Optional[ty.Dict[str, ty.List[int]]] = None, + ): """Get single plugin either from cache or initialize it.""" # Check if plugin for data_type is already cached - if self._plugins_are_cached((data_type,)): + if self._plugins_are_cached((data_type,), chunk_number=chunk_number): cached_plugins = self.__get_requested_plugins_from_cache(run_id, (data_type,)) target_plugin = cached_plugins[data_type] return target_plugin @@ -861,10 +881,11 @@ def __get_plugin(self, run_id: str, data_type: str): self._set_plugin_config(plugin, run_id, tolerant=True) plugin.deps = { - d_depends: self.__get_plugin(run_id, d_depends) for d_depends in plugin.depends_on + d_depends: self.__get_plugin(run_id, d_depends, chunk_number=chunk_number) + for d_depends in plugin.depends_on } - self.__add_lineage_to_plugin(run_id, plugin) + self.__add_lineage_to_plugin(run_id, plugin, chunk_number=chunk_number) if not hasattr(plugin, "data_kind") and not plugin.multi_output: if len(plugin.depends_on): @@ -879,11 +900,34 @@ def __get_plugin(self, run_id: str, data_type: str): plugin.fix_dtype() # Add plugin to cache - self._plugins_to_cache({data_type: plugin for data_type in plugin.provides}) + self._plugins_to_cache( + {data_type: plugin for data_type in plugin.provides}, chunk_number=chunk_number + ) return plugin - def __add_lineage_to_plugin(self, run_id, plugin): + @staticmethod + def _check_chunk_number(chunk_number: ty.List[int]): + """Check if the chunk_number is a list of consecutive integers.""" + mask = isinstance(chunk_number, list) + mask &= all([isinstance(x, int) for x in chunk_number]) + if not mask: + raise ValueError(f"chunk_number should be a list of integers, but got {chunk_number}") + + # Check if the difference between adjacent elements is exactly one + for i in range(len(chunk_number) - 1): + if chunk_number[i + 1] - chunk_number[i] != 1: + raise ValueError( + "chunk_number should be a list of consecutive integers, " + f"but got {chunk_number}" + ) + + def __add_lineage_to_plugin( + self, + run_id, + plugin, + chunk_number: ty.Optional[ty.Dict[str, ty.List[int]]] = None, + ): """Adds lineage to plugin in place. Also adds parent infromation in case of a child plugin. @@ -918,22 +962,30 @@ def __add_lineage_to_plugin(self, run_id, plugin): for parent_class in plugin.__class__.__bases__: configs[parent_class.__name__] = parent_class.__version__ - plugin.lineage = { - last_provide: (plugin.__class__.__name__, plugin.version(run_id), configs) - } else: - plugin.lineage = { - last_provide: ( - plugin.__class__.__name__, - plugin.version(run_id), - { - option: setting - for option, setting in plugin.config.items() - if plugin.takes_config[option].track - }, - ) + configs = { + option: setting + for option, setting in plugin.config.items() + if plugin.takes_config[option].track } + # Set chunk_number in the lineage + if chunk_number is not None: + for d_depends in plugin.depends_on: + if d_depends in chunk_number: + configs.setdefault("chunk_number", {}) + if d_depends in configs["chunk_number"]: + raise ValueError( + f"Chunk number for {d_depends} is already set in the lineage" + ) + self._check_chunk_number(chunk_number[d_depends]) + configs["chunk_number"][d_depends] = chunk_number[d_depends] + + plugin.lineage = { + last_provide: (plugin.__class__.__name__, plugin.version(run_id), configs) + } + + # This is why the lineage of a plugin contains all its dependencies for d_depends in plugin.depends_on: plugin.lineage.update(plugin.deps[d_depends].lineage) @@ -1038,7 +1090,7 @@ def get_components( if len(t) == 1: raise ValueError(f"Plugin names must be more than one letter, not {t}") - plugins = self._get_plugins(targets, run_id) + plugins = self._get_plugins(targets, run_id, chunk_number=chunk_number) allow_hyperruns = [plugins[target_i].allow_hyperrun for target_i in targets] if sum(allow_hyperruns) != 0 and sum(allow_hyperruns) != len(targets): @@ -1063,10 +1115,14 @@ def check_cache(target_i): # Can we load this data? loading_this_data = False - key = self.key_for(run_id, target_i) + key = self.key_for(run_id, target_i, chunk_number=chunk_number) + if chunk_number is not None and target_i in chunk_number: + _chunk_number = chunk_number[target_i] + else: + _chunk_number = None loader = self._get_partial_loader_for( - key, chunk_number=chunk_number, time_range=time_range + key, chunk_number=_chunk_number, time_range=time_range ) _is_temp = not target_plugin.provides[0].startswith("_temp") @@ -1092,14 +1148,18 @@ def check_cache(target_i): ldrs = [] for subrun in sub_run_spec: - sub_key = self.key_for(subrun, target_i) + sub_key = self.key_for(subrun, target_i, chunk_number=chunk_number) if sub_run_spec[subrun] == "all": _subrun_time_range = None else: _subrun_time_range = sub_run_spec[subrun] + if chunk_number is not None and target_i in chunk_number: + _chunk_number = chunk_number[target_i] + else: + _chunk_number = None loader = self._get_partial_loader_for( - sub_key, time_range=_subrun_time_range, chunk_number=chunk_number + sub_key, time_range=_subrun_time_range, chunk_number=_chunk_number ) if not loader: raise RuntimeError( @@ -1229,9 +1289,13 @@ def concat_loader(*args, **kwargs): # otherwise we will see error at Chunk.concatenate # but anyway the data is should already been made for d_to_save in set(current_plugin_to_savers + list(target_plugin.provides)): - key = self.key_for(run_id, d_to_save) + key = self.key_for(run_id, d_to_save, chunk_number=chunk_number) + if chunk_number is not None and d_to_save in chunk_number: + _chunk_number = chunk_number[d_to_save] + else: + _chunk_number = None loader = self._get_partial_loader_for( - key, time_range=time_range, chunk_number=chunk_number + key, time_range=time_range, chunk_number=_chunk_number ) if ( @@ -1478,7 +1542,7 @@ def get_iter( allow_multiple=False, progress_bar=True, multi_run_progress_bar=True, - _chunk_number=None, + chunk_number=None, processor=None, **kwargs, ) -> ty.Iterator[strax.Chunk]: @@ -1518,7 +1582,7 @@ def get_iter( # to merge the results automatically. if isinstance(targets, (list, tuple)) and len(targets) > 1: targets = tuple(set(strax.to_str_tuple(targets))) - plugins = self._get_plugins(targets=targets, run_id=run_id) + plugins = self._get_plugins(targets=targets, run_id=run_id, chunk_number=chunk_number) if len(set(plugins[d].data_kind_for(d) for d in targets)) == 1: temp_name = "_temp_" + strax.deterministic_hash(targets) p = type(temp_name, (strax.MergeOnlyPlugin,), dict(depends_on=tuple(targets))) @@ -1539,7 +1603,7 @@ def get_iter( targets=targets, save=save, time_range=time_range, - chunk_number=_chunk_number, + chunk_number=chunk_number, multi_run_progress_bar=multi_run_progress_bar, ) @@ -1687,6 +1751,7 @@ def make( save=tuple(), max_workers=None, _skip_if_built=True, + chunk_number=None, **kwargs, ) -> None: """Compute target for run_id. Returns nothing (None). @@ -1709,13 +1774,21 @@ def make( log=self.log, save=save, max_workers=max_workers, + chunk_number=chunk_number, **kwargs, ) - if _skip_if_built and self.is_stored(run_ids[0], targets): + if _skip_if_built and self.is_stored(run_ids[0], targets, chunk_number=chunk_number): return - for _ in self.get_iter(run_ids[0], targets, save=save, max_workers=max_workers, **kwargs): + for _ in self.get_iter( + run_ids[0], + targets, + save=save, + max_workers=max_workers, + chunk_number=chunk_number, + **kwargs, + ): pass def get_array( @@ -1914,7 +1987,7 @@ def get_zarr( for run_id in strax.to_str_tuple(run_ids): if zarray is not None and run_id in zarray.attrs.get("RUNS", {}): continue - key = self.key_for(run_id, target) + key = self.key_for(run_id, target, chunk_number=kwargs.get("chunk_number", None)) INSERTED[run_id] = dict(start_idx=idx, end_idx=idx, lineage_hash=key.lineage_hash) for chunk in self.get_iter(run_id, target, progress_bar=progress_bar, **kwargs): end_idx = idx + chunk.data.size @@ -1929,7 +2002,7 @@ def get_zarr( zarray.attrs["RUNS"] = dict(zarray.attrs.get("RUNS", {}), **INSERTED) return group - def key_for(self, run_id, target): + def key_for(self, run_id, target, chunk_number=None): """Get the DataKey for a given run and a given target plugin. The DataKey is inferred from the plugin lineage. The lineage can come either from the _fixed_plugin_cache or computed on the fly. @@ -1939,7 +2012,7 @@ def key_for(self, run_id, target): :return: strax.DataKey of the target """ - if self._plugins_are_cached((target,)): + if self._plugins_are_cached((target,), chunk_number=chunk_number): context_hash = self._context_hash() if context_hash in self._fixed_plugin_cache: plugins = self._fixed_plugin_cache[self._context_hash()] @@ -1948,14 +2021,14 @@ def key_for(self, run_id, target): self.log.warning( f"Context hash changed to {context_hash} for {self._plugin_class_registry}?" ) - plugins = self._get_plugins((target,), run_id) + plugins = self._get_plugins((target,), run_id, chunk_number=chunk_number) else: - plugins = self._get_plugins((target,), run_id) + plugins = self._get_plugins((target,), run_id, chunk_number=chunk_number) lineage = plugins[target].lineage return strax.DataKey(run_id, target, lineage) - def get_meta(self, run_id, target) -> dict: + def get_meta(self, run_id, target, chunk_number=None) -> dict: """Return metadata for target for run_id, or raise DataNotAvailable if data is not yet available. @@ -1963,7 +2036,7 @@ def get_meta(self, run_id, target) -> dict: :param target: data type to get """ - key = self.key_for(run_id, target) + key = self.key_for(run_id, target, chunk_number=chunk_number) for sf in self._sorted_storage: try: return sf.get_metadata(key, **self._find_options) @@ -2026,8 +2099,6 @@ def _extract_metadata_and_lineage(self, run_id, target, metafile): lineage = ( metadata["lineage"] if _is_stored else self.key_for(run_id, target).lineage ) - _lineage_hash = str(self.key_for(run_id, target)) - print(_lineage_hash) elif isinstance(metafile, dict): metadata = metafile lineage = metadata["lineage"] @@ -2110,7 +2181,7 @@ def run_defaults(self, run_id): self._run_defaults_cache[run_id] = defs return defs - def is_stored(self, run_id, target, detailed=False, **kwargs): + def is_stored(self, run_id, target, detailed=False, chunk_number=None, **kwargs): """Return whether data type target has been saved for run_id through any of the registered storage frontends. @@ -2119,7 +2190,9 @@ def is_stored(self, run_id, target, detailed=False, **kwargs): """ if isinstance(target, (tuple, list)): - return all([self.is_stored(run_id, t, **kwargs) for t in target]) + return all( + [self.is_stored(run_id, t, chunk_number=chunk_number, **kwargs) for t in target] + ) # If any new options given, replace the current context # with a temporary one @@ -2129,7 +2202,7 @@ def is_stored(self, run_id, target, detailed=False, **kwargs): self = self.new_context(**kwargs) for sf in self._sorted_storage: - if self._is_stored_in_sf(run_id, target, sf): + if self._is_stored_in_sf(run_id, target, sf, chunk_number=chunk_number): return True # None of the frontends has the data @@ -2316,6 +2389,79 @@ def wrapped_loader(): "do you have two storage frontends writing to the same place?" ) + def merge_per_chunk_storage( + self, + run_id: str, + target: str, + per_chunked_dependency: str, + rechunk=True, + chunk_number_group: ty.Optional[ty.List[ty.List[int]]] = None, + ): + """Merge the per-chunked data from the per-chunked dependency into the target storage.""" + + if chunk_number_group is not None: + combined_chunk_numbers = list(itertools.chain(*chunk_number_group)) + if len(combined_chunk_numbers) != len(set(combined_chunk_numbers)): + raise ValueError(f"Duplicate chunk numbers found in {chunk_number_group}") + _chunk_number = {per_chunked_dependency: combined_chunk_numbers} + else: + # if no chunk numbers are given, use information from the dependency + chunks = self.get_meta(run_id, per_chunked_dependency)["chunks"] + chunk_number_group = [[c["chunk_i"]] for c in chunks] + _chunk_number = None + + # Make sure that all needed runs are stored + for chunk_number in chunk_number_group: + assert self.is_stored( + run_id, target, chunk_number={per_chunked_dependency: chunk_number} + ) + + # Usually we want to save in the same storage frontend + # Here we assume that the target is stored chunk by chunk of the dependency + target_sf = source_sf = self.get_source_sf( + run_id, + target, + chunk_number={per_chunked_dependency: chunk_number}, + should_exist=True, + )[0] + + def wrapped_loader(): + """Wrapped loader for changing the target_size_mb.""" + for chunk_number in chunk_number_group: + # Mostly copied from self.copy_to_frontend + # Get the info from the source backend (s_be) that we need to fill + # the target backend (t_be) with + data_key = self.key_for( + run_id, target, chunk_number={per_chunked_dependency: chunk_number} + ) + # This should never fail, we just tried + s_be_str, s_be_key = source_sf.find(data_key) + s_be = source_sf._get_backend(s_be_str) + md = s_be.get_metadata(s_be_key) + + loader = s_be.loader(s_be_key) + try: + while True: + # pylint: disable=cell-var-from-loop + data = next(loader) + # Update target chunk size for re-chunking + data.target_size_mb = md["chunk_target_size_mb"] + yield data + except StopIteration: + continue + + # Fill the target buffer + data_key = self.key_for(run_id, target, chunk_number=_chunk_number) + t_be_str, t_be_key = target_sf.find(data_key, write=True) + target_be = target_sf._get_backend(t_be_str) + target_plugin = self.__get_plugin(run_id, target, chunk_number=_chunk_number) + target_md = target_plugin.metadata(run_id, target) + # Copied from StorageBackend.saver + if "dtype" in target_md: + target_md["dtype"] = target_md["dtype"].descr.__repr__() + saver = target_be._saver(t_be_key, target_md) + saver.save_from(wrapped_loader(), rechunk=rechunk) + def get_source( self, run_id: str, @@ -2420,7 +2566,13 @@ def stored_dependencies( ) return _targets_stored - def _is_stored_in_sf(self, run_id, target, storage_frontend: strax.StorageFrontend) -> bool: + def _is_stored_in_sf( + self, + run_id, + target, + storage_frontend: strax.StorageFrontend, + chunk_number: ty.Optional[ty.Dict[str, ty.List[int]]] = None, + ) -> bool: """Check if the storage frontend has the requested datakey for the run_id and target. :param storage_frontend: strax.StorageFrontend to check if it has the requested datakey for @@ -2428,14 +2580,14 @@ def _is_stored_in_sf(self, run_id, target, storage_frontend: strax.StorageFronte :return: if the frontend has the key or not. """ - key = self.key_for(run_id, target) + key = self.key_for(run_id, target, chunk_number=chunk_number) try: storage_frontend.find(key, **self._find_options) return True except strax.DataNotAvailable: return False - def get_source_sf(self, run_id, target, should_exist=False): + def get_source_sf(self, run_id, target, should_exist=False, chunk_number=None): """Get the source storage frontends for a given run_id and target. :param run_id, target: run_id, target @@ -2452,6 +2604,7 @@ def get_source_sf(self, run_id, target, should_exist=False): run_id, t, should_exist=should_exist, + chunk_number=chunk_number, ) for t in target ] @@ -2459,7 +2612,7 @@ def get_source_sf(self, run_id, target, should_exist=False): frontends = [] for sf in self._sorted_storage: - if self._is_stored_in_sf(run_id, target, sf): + if self._is_stored_in_sf(run_id, target, sf, chunk_number=chunk_number): frontends.append(sf) if should_exist and not frontends: raise ValueError( @@ -2578,7 +2731,7 @@ def add_method(cls, f): - fully_contained: (default) select things fully contained in the range - touching: select things that (partially) overlap with the range - skip: Do not select a time range, even if other arguments say so -:param _chunk_number: For internal use: return data from one chunk. +:param chunk_number: For internal use: return data from one chunk. :param progress_bar: Display a progress bar if metedata exists. :param multi_run_progress_bar: Display a progress bar for loading multiple runs """ diff --git a/strax/run_selection.py b/strax/run_selection.py index de2dfa07..5ea88054 100644 --- a/strax/run_selection.py +++ b/strax/run_selection.py @@ -53,7 +53,7 @@ def keys_for_runs( self, target: str, run_ids: ty.Union[np.ndarray, list, tuple, str] ) -> ty.List[strax.DataKey]: """Get the data-keys for a multitude of runs. If use_per_run_defaults is False which it - preferably is (#246), getting many keys should be fast as we only only compute the lineage once. + preferably is (#246), getting many keys should be fast as we only compute the lineage once. :param run_ids: Runs to get datakeys for :param target: datatype requested