From e26b979dbf677dbf5c771af74f06165d6ef0c0d5 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 12:28:27 +0000 Subject: [PATCH 1/7] move stuff in from txpipe and update docs --- ceci/stage.py | 243 +++++++++++++++++++++++++++++++++++++++++--- tests/test_stage.py | 43 +++++++- 2 files changed, 270 insertions(+), 16 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index e5846f5..584b1de 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -9,6 +9,7 @@ import pdb import datetime import warnings +import socket from abc import abstractmethod from . import errors @@ -147,7 +148,10 @@ def get_aliased_tag(self, tag): @abstractmethod def run(self): # pragma: no cover - """Run the stage and return the execution status""" + """Run the stage and return the execution status. + + Subclasses must implemented this method. + """ raise NotImplementedError("run") def validate(self): @@ -348,7 +352,8 @@ def __init_subclass__(cls, **kwargs): path = pathlib.Path(filename).resolve() # Add a description of the parameters to the end of the docstring - if stage_is_complete: + # If no config options are specified, omit this. + if stage_is_complete and cls.config_options: config_text = cls._describe_configuration_text() if cls.__doc__ is None: cls.__doc__ = f"Stage {cls.name}\n\nConfiguration Parameters:\n{config_text}" @@ -810,13 +815,24 @@ def is_parallel(self): def is_mpi(self): """ - Returns True if the stage is being run under MPI. + Check if the stage is being run under MPI. + + Returns + ------- + bool + True if the stage is being run under MPI """ return self._parallel == MPI_PARALLEL def is_dask(self): """ - Returns True if the stage is being run in parallel with Dask. + Check if the stage is being run in parallel with Dask. + + Returns + ------- + bool + True if the stage is being run under MPI + """ return self._parallel == DASK_PARALLEL @@ -967,6 +983,11 @@ def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True): Parallel: bool Whether to split data by rank or just give all procs all data. Default=True + + Returns + ------- + start, end: tuple + The start and end of the range of rows to be read by this process """ n_chunks = n_rows // chunk_rows if n_chunks * chunk_rows < n_rows: # pragma: no cover @@ -988,6 +1009,17 @@ def get_input(self, tag): """ Return the path of an input file with the given tag, which can be aliased. + + Parameters + ---------- + tag: str + Tag as listed in self.outputs + + Returns + ------- + path: str + The path to the output file + """ tag = self.get_aliased_tag(tag) return self._inputs[tag] @@ -1000,7 +1032,21 @@ def get_output(self, tag, final_name=False): which can be aliased already. If final_name is False then use a temporary name - file will - be moved to its final name at the end + be moved to its final name at the end. The temporary name + is prefixed with "inprogress_". + + Parameters + ---------- + tag: str + Tag as listed in self.outputs + + final_name: bool + Default=False. Whether to save to the final name. + + Returns + ------- + path: str + The path to the output file """ tag = self.get_aliased_tag(tag) @@ -1023,6 +1069,21 @@ def open_input(self, tag, wrapper=False, **kwargs): For specialized file types like FITS or HDF5 it will return a more specific object - see the types.py file for more info. + Parameters + ---------- + tag: str + Tag as listed in self.inputs + + wrapper: bool + Whether to return an underlying file object (False) or a data type instange (True) + + **kwargs: dict + Extra arguments to pass to the file class constructor + + Returns + ------- + obj: file or object + The opened file or object """ path = self.get_input(tag) input_class = self.get_input_type(tag) @@ -1050,12 +1111,11 @@ def open_output( Parameters ---------- - tag: str Tag as listed in self.outputs wrapper: bool - Default=False. Whether to return a wrapped file + Whether to return an underlying file object (False) or a data type instange (True) final_name: bool Default=False. Whether to save to @@ -1063,6 +1123,10 @@ def open_output( **kwargs: Extra args are passed on to the file's class constructor. + Returns + ------- + obj: file or object + The opened file or object """ path = self.get_output(tag, final_name=final_name) output_class = self.get_output_type(tag) @@ -1107,42 +1171,84 @@ def open_output( @classmethod def inputs_(cls): """ - Return the dict of inputs + Return the dict mapping input tags to file names. + + Returns + ------- + in_dict : dict[str:str] """ return cls.inputs # pylint: disable=no-member @classmethod def outputs_(cls): """ - Return the dict of inputs + Return the dict mapping output tags to file names. + + Returns + ------- + out_dict : dict[str:str] """ return cls.outputs # pylint: disable=no-member @classmethod def output_tags(cls): """ - Return the list of output tags required by this stage + Return the list of output tags required by this stage. + + Returns + ------- + out_tags : list[str] + The list of output tags """ return [tag for tag, _ in cls.outputs_()] @classmethod def input_tags(cls): """ - Return the list of input tags required by this stage + Return the list of input tags required by this stage. + + Returns + ------- + in_tags : list[str] + The list of input tags """ return [tag for tag, _ in cls.inputs_()] def get_input_type(self, tag): - """Return the file type class of an input file with the given tag.""" + """ + Return the file type class of an input file with the given tag. + + Parameters + ---------- + tag : str + The tag of the input file + + Returns + ------- + ftype : FileType + The file type class + """ tag = self.get_aliased_tag(tag) - for t, dt in self.inputs_(): + for t, dt in self.inputs_().items(): t = self.get_aliased_tag(t) if t == tag: return dt raise ValueError(f"Tag {tag} is not a known input") # pragma: no cover def get_output_type(self, tag): - """Return the file type class of an output file with the given tag.""" + """ + Return the file type class of an output file with the given tag. + + Parameters + ---------- + tag : str + The tag of the output file + + Returns + ------- + ftype : FileType + The file type class + """ tag = self.get_aliased_tag(tag) for t, dt in self.outputs_(): t = self.get_aliased_tag(t) @@ -1162,8 +1268,12 @@ def instance_name(self): @property def config(self): """ - Returns the configuration dictionary for this stage, aggregating command + The configuration dictionary for this stage, aggregating command line options and optional configuration file. + + Options specified in the subclass variable `config_options` are + read from the configuration file, command line, or `make_stage` choices, + and stored in this dictionary. """ return self._configs @@ -1292,6 +1402,7 @@ def iterate_fits( Loop through chunks of the input data from a FITS file with the given tag TODO: add ceci tests of this functions + Parameters ---------- tag: str @@ -1380,6 +1491,59 @@ def iterate_hdf( data = {col: group[col][start:end] for col in cols} yield start, end, data + def combined_iterators(self, rows, *inputs, parallel=True): + """ + Iterate through multiple files at the same time. + + If you have more several HDF files with the some + columns of the same length then you can use this method to + iterate through them all at once, and combine the data from + all of them into a single dictionary. + + Parameters + ---------- + rows: int + The number of rows to read in each chunk + + *inputs: list + A list of (tag, group, cols) triples for each file to read. + In each case tag is the input file name tag, group is the + group within the HDF5 file to read, and cols is a list of + columns to read from that group. Specify multiple triplets + to read from multiple files + + parallel: bool + Whether to split up data among processes (parallel=True) or give + all processes all data (parallel=False). Default = True. + + Returns + ------- + it: iterator + Iterator yielding (int, int, dict) tuples of (start, end, data) + """ + if not len(inputs) % 3 == 0: + raise ValueError( + "Arguments to combined_iterators should be in threes: " + "tag, group, value" + ) + n = len(inputs) // 3 + + iterators = [] + for i in range(n): + tag = inputs[3 * i] + section = inputs[3 * i + 1] + cols = inputs[3 * i + 2] + iterators.append( + self.iterate_hdf(tag, section, cols, rows, parallel=parallel) + ) + + for it in zip(*iterators): + data = {} + for (s, e, d) in it: + data.update(d) + yield s, e, data + + ################################ # Pipeline-related methods ################################ @@ -1579,3 +1743,52 @@ def generate_cwl(cls, log_dir=None): # cwl_tool.metadata = cwlgen.Metadata(**metadata) return cwl_tool + + + def time_stamp(self, tag): + """ + Print a time stamp with an optional tag. + + Parameters + ---------- + tag: str + Additional info to print in the output line. Default is empty. + """ + t = datetime.datetime.now() + print(f"Process {self.rank}: {tag} {t}") + sys.stdout.flush() + + def memory_report(self, tag=None): + """ + Print a report about memory currently available + on the node the process is running on. + + Parameters + ---------- + tag: str + Additional info to print in the output line. Default is empty. + """ + import psutil + + t = datetime.datetime.now() + + # The different types of memory are really fiddly and don't + # correspond to how you usually imagine. The simplest thing + # to report here is just how much memory is left on the machine. + mem = psutil.virtual_memory() + avail = mem.available / 1024**3 + total = mem.total / 1024**3 + + if tag is None: + tag = "" + else: + tag = f" {tag}:" + + # This gives you the name of the host. At NERSC that is the node name + host = socket.gethostname() + + # Print messsage + print( + f"{t}: Process {self.rank}:{tag} Remaining memory on {host} {avail:.1f} GB / {total:.1f} GB" + ) + sys.stdout.flush() diff --git a/tests/test_stage.py b/tests/test_stage.py index 6f1105d..c086a0b 100644 --- a/tests/test_stage.py +++ b/tests/test_stage.py @@ -607,6 +607,47 @@ def run(self): assert 'mike_stage.py", line 15' in outs.decode() +def test_combined_iterators(): + class Oscar(PipelineStage): + inputs = [("inp1", HDFFile), ("inp2", HDFFile)] + outputs = [] + config_options = {} + def run(self): + self.combined_iterators() + it = self.combined_iterators( + 10, + "inp1", "group1", ["x"], + "inp2", "group1", ["y", "z"]) + for (s, e, data) in it: + pass + + oo = Oscar.make_stage(inp1="tests/test.hdf5", inp2="tests/test.hdf5") + oo.run() + + +def test_memory_and_time_reports(capsys): + class November(PipelineStage): + name = f"November" + parallel = False + inputs = [] + outputs = [] + config_options = {} + + def run(self): + self.memory_report() + self.memory_report("TAGTAG") + self.time_stamp("Hello") + + + nn = November.make_stage() + nn.run() + captured = capsys.readouterr() + assert "Remaining memory on" in captured.out + assert "TAGTAG" in captured.out + assert "Process 0: Hello" in captured.out + + if __name__ == "__main__": test_construct() - test_wrong_mpi_flag() + test_combined_iterators() + test_memory_and_time_reports() From abd0cf4696d0411132e0a9ca94b1f94bb7e3139c Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 13:44:44 +0000 Subject: [PATCH 2/7] Fix up tests --- ceci/stage.py | 2 +- tests/test_stage.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index 584b1de..43697ca 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -1229,7 +1229,7 @@ def get_input_type(self, tag): The file type class """ tag = self.get_aliased_tag(tag) - for t, dt in self.inputs_().items(): + for t, dt in self.inputs_(): t = self.get_aliased_tag(t) if t == tag: return dt diff --git a/tests/test_stage.py b/tests/test_stage.py index c086a0b..1f2b193 100644 --- a/tests/test_stage.py +++ b/tests/test_stage.py @@ -613,7 +613,6 @@ class Oscar(PipelineStage): outputs = [] config_options = {} def run(self): - self.combined_iterators() it = self.combined_iterators( 10, "inp1", "group1", ["x"], From 579f14213e100a6641ba0a3a563a1346fc3be74d Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 13:45:18 +0000 Subject: [PATCH 3/7] Fix up tests --- tests/test_stage.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_stage.py b/tests/test_stage.py index 1f2b193..7de6331 100644 --- a/tests/test_stage.py +++ b/tests/test_stage.py @@ -648,5 +648,4 @@ def run(self): if __name__ == "__main__": test_construct() - test_combined_iterators() - test_memory_and_time_reports() + test_wrong_mpi_flag() \ No newline at end of file From 038fd035c95cc24caf7b1faa76add153cdbb39b8 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 13:45:25 +0000 Subject: [PATCH 4/7] Fix up tests --- tests/test_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_stage.py b/tests/test_stage.py index 7de6331..9d9cb6e 100644 --- a/tests/test_stage.py +++ b/tests/test_stage.py @@ -648,4 +648,4 @@ def run(self): if __name__ == "__main__": test_construct() - test_wrong_mpi_flag() \ No newline at end of file + test_wrong_mpi_flag() From d8439bd4ea2bbe264c1d538b87806fc9d6c8498e Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 13:45:53 +0000 Subject: [PATCH 5/7] tidy spacing --- ceci/stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ceci/stage.py b/ceci/stage.py index 43697ca..df2ada0 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -149,7 +149,7 @@ def get_aliased_tag(self, tag): @abstractmethod def run(self): # pragma: no cover """Run the stage and return the execution status. - + Subclasses must implemented this method. """ raise NotImplementedError("run") From dd5eb5f77b1d48a9a48804c4db8da94619502ea3 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 14:01:59 +0000 Subject: [PATCH 6/7] change docs inprogress syntax --- ceci/stage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index df2ada0..a5b1723 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -1033,7 +1033,7 @@ def get_output(self, tag, final_name=False): If final_name is False then use a temporary name - file will be moved to its final name at the end. The temporary name - is prefixed with "inprogress_". + is prefixed with `inprogress_`. Parameters ---------- @@ -1100,7 +1100,7 @@ def open_output( Find and open an output file with the given tag, in write mode. If final_name is True then they will be opened using their final - target output name. Otherwise we will prepend "inprogress_" to their + target output name. Otherwise we will prepend `inprogress_` to their file name. This means we know that if the final file exists then it is completed. From cade50803e2bbe17058ebd00f968a7b4c52baa7b Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 28 Jan 2025 14:39:33 +0000 Subject: [PATCH 7/7] typo fix --- ceci/stage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ceci/stage.py b/ceci/stage.py index a5b1723..38ef6ad 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -1075,7 +1075,7 @@ def open_input(self, tag, wrapper=False, **kwargs): Tag as listed in self.inputs wrapper: bool - Whether to return an underlying file object (False) or a data type instange (True) + Whether to return an underlying file object (False) or a data type instance (True) **kwargs: dict Extra arguments to pass to the file class constructor @@ -1115,7 +1115,7 @@ def open_output( Tag as listed in self.outputs wrapper: bool - Whether to return an underlying file object (False) or a data type instange (True) + Whether to return an underlying file object (False) or a data type instance (True) final_name: bool Default=False. Whether to save to