diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..3fc94d0 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,10 @@ +# .coveragerc to control coverage.py +[run] +omit= + *ceci/sites/ccin2p3.py + *ceci/sites/cori.py + + +[html] +directory = cover + diff --git a/.pylintrc b/.pylintrc index 2f2186d..dd4a211 100644 --- a/.pylintrc +++ b/.pylintrc @@ -60,7 +60,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,abstract-method,broad-except,invalid-name,line-too-long,wrong-import-order,wrong-import-position,too-many-statements,super-with-arguments,missing-function-docstring,missing-class-docstring,empty-docstring,missing-module-docstring,import-outside-toplevel,f-string-without-interpolation,too-many-arguments,unspecified-encoding +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,abstract-method,broad-except,invalid-name,line-too-long,wrong-import-order,wrong-import-position,too-many-statements,super-with-arguments,import-outside-toplevel,too-many-arguments,too-many-instance-attributes,unspecified-encoding,missing-class-docstring,missing-function-docstring,missing-module-docstring [REPORTS] diff --git a/ceci/__init__.py b/ceci/__init__.py index b629033..d85040d 100644 --- a/ceci/__init__.py +++ b/ceci/__init__.py @@ -5,6 +5,6 @@ try: __version__ = get_distribution(__name__).version -except DistributionNotFound: +except DistributionNotFound: #pragma: no cover # package is not installed pass diff --git a/ceci/config.py b/ceci/config.py index 431de42..292a73f 100644 --- a/ceci/config.py +++ b/ceci/config.py @@ -1,4 +1,5 @@ -import os +""" A small module with functionality to handle configuration in a way that works +for both interactive and ballistic applications """ from collections.abc import Mapping @@ -32,7 +33,7 @@ def cast_value(dtype, value): #pylint: disable=too-many-return-statements 5. It will try to pass value to the constructor of dtype, i.e., return dtype(value) 6. If all of these fail, it will raise a TypeError """ - # dtype is None means all values are legal + # dtype is None means all values are legal if dtype is None: return value # value is None is always allowed @@ -48,49 +49,82 @@ def cast_value(dtype, value): #pylint: disable=too-many-return-statements except (TypeError, ValueError): pass - msg = f"Value of type {type(value)}, when {str(dtype)} was expected." + msg = f"Value of type {type(value)}, when {str(dtype)} was expected." raise TypeError(msg) class StageParameter: + """ A small class to manage a single parameter with basic type checking """ - """ + def __init__(self, **kwargs): + """ Build from keywords + + Keywords + -------- + dtype : `type` or `None` + The data type for this parameter + default : `dtype` or `None` + The default value + format : `str` + A formatting string for printout and representation + help : `str` + A help or docstring + """ kwcopy = kwargs.copy() self._help = kwcopy.pop('help', 'A Parameter') self._format = kwcopy.pop('format', '%s') self._dtype = kwcopy.pop('dtype', None) - self._default = kwcopy.pop('default', None) - if kwcopy: + self._default = kwcopy.pop('default', None) + if kwcopy: # pragma: no cover raise ValueError(f"Unknown arguments to StageParameter {str(list(kwcopy.keys()))}") self._value = cast_value(self._dtype, self._default) - + @property def value(self): + """ Return the value """ return self._value + @property + def dtype(self): + """ Return the data type """ + return self._dtype + + @property + def default(self): + """ Return the default value """ + return self._default + def set(self, value): + """ Set the value, raising a TypeError if the value is the wrong type """ self._value = cast_value(self._dtype, value) return self._value - - def set_default(self, value): + + def set_to_default(self): + """ Set the value to the default """ self._value = cast_value(self._dtype, self._default) return self._value - - def __set__(self, obj, value): - self._value = cast_value(self._dtype, value) - return self._value - - def __get__(self, obj, obj_class): - return self._value - -class StageConfig: - """ + +class StageConfig(dict): + """ A small class to manage a dictionary of configuration parameters with basic type checking """ + def __init__(self, **kwargs): - self._param_dict = {} + """ Build from keywords + + Note + ---- + The keywords are used as keys for the configuration parameters + + The values are used to define the allowed data type and default values + + For each key-value pair: + If the value is a type then it will define the data type and the default will be `None` + If the value is a value then it will set the default value define the data type as type(value) + """ + dict.__init__(self) for key, val in kwargs.items(): if val is None: dtype = None @@ -100,30 +134,14 @@ def __init__(self, **kwargs): default = None else: dtype = type(val) - default = val + default = val param = StageParameter(dtype=dtype, default=default) - self._param_dict[key] = param - - def keys(self): - return self._param_dict.keys() - - def values(self): - return self._param_dict.values() - - def items(self): - return self._param_dict.items() - - def __len__(self): - return len(self._param_dict) - - def get(self, key, def_value=None): - if key in self._param_dict: - return self.__getattr__(key) - return def_value + self[key] = param def __str__(self): + """ Override __str__ casting to deal with `StageParameter` object in the map """ s = "{" - for key, attr in self._param_dict.items(): + for key, attr in self.items(): if isinstance(attr, StageParameter): val = attr.value else: @@ -131,60 +149,79 @@ def __str__(self): s += f"{key}:{val}," s += "}" return s - + def __repr__(self): + """ A custom representation """ s = "StageConfig" s += self.__str__() return s - - def __getitem__(self, key): - return self.__getattr__(key) - - def __setitem__(self, key, value): - return self.__setattr__(key, value) - def __delitem__(self, key): - return self.__delattr__(key) - - def __getattr__(self, key): - attr = self._param_dict[key] + def __getitem__(self, key): + """ Override the __getitem__ to work with `StageParameter` """ + attr = dict.__getitem__(self, key) if isinstance(attr, StageParameter): return attr.value return attr - def __setattr__(self, key, value): - if key == '_param_dict': - self.__dict__[key] = value - return - if key in self._param_dict: - attr = self._param_dict[key] + def __setitem__(self, key, value): + """ Override the __setitem__ to work with `StageParameter` """ + if key in self: + attr = dict.__getitem__(self, key) if isinstance(attr, StageParameter): return attr.set(value) - self._param_dict[key] = value + dict.__setitem__(self, key, value) return value - - def __delattr__(self, key, value): - if key in self._param_dict: - attr = self._param_dict[key] - if isinstance(attr, StageParameter): - return attr.set_default() - return self._param_dict.pop(key) + + def __getattr__(self, key): + """ Allow attribute-like parameter access """ + return self.__getitem__(key) + + def __setattr__(self, key, value): + """ Allow attribute-like parameter setting """ + return self.__setitem__(key, value) def set_config(self, input_config, args): - for key in self._param_dict.keys(): + """ Utility function to load configuration + + Parameters + ---------- + input_config : `dict, (str, value)` + `dict` with key-value pairs for all the parameters + args : `dict, (str, value)` + `dict` with key-value pairs for all the parameters that can serve as overrides + """ + for key in self.keys(): val = None if key in input_config: val = input_config[key] if args.get(key) is not None: val = args[key] if val is None: - raise ValueError(f"Missing configuration option {key}") + attr = self.get(key) + if attr.default is None: + raise ValueError(f"Missing configuration option {key}") + val = attr.default self.__setattr__(key, val) for key, val in input_config.items(): - if key in self._param_dict: + if key in self: continue - self._param_dict[key] = val - + self[key] = val - + for key, val in args.items(): + if key in self: + continue + self[key] = val + + + def reset(self): + """ Reset values to their defaults """ + for _, val in self.items(): + if isinstance(val, StageParameter): + val.set_to_default() + + def get_type(self, key): + attr = dict.__getitem__(self, key) + if isinstance(attr, StageParameter): + return attr.dtype + return type(attr) diff --git a/ceci/main.py b/ceci/main.py index fdf6bd0..8a89967 100644 --- a/ceci/main.py +++ b/ceci/main.py @@ -27,12 +27,37 @@ ) -def run_prescript(pre_script=None, dry_run=False, script_args=[]): +def run_prescript(pre_script=None, dry_run=False, script_args=None): + """ Run a script before running the pipeline + + Parameters + ---------- + pre_script : `str` + The script to run + dry_run : `bool` + If true do not run the script + script_args : `list`, (`str`) + Arguments to the script + """ + if script_args is None: #pragma: no cover + script_args = [] if pre_script and not dry_run: subprocess.check_call(pre_script.split() + script_args, shell=True) -def run_pipeline(pipe_config): +def run_pipeline(pipe_config): + """ Run a pipeline as defined by a particular configuration + + Parameters + ---------- + pipe_config : `dict` + The configuration dictionary + + Returns + ------- + status : `int` + Usual unix convention of 0 -> success, non-zero is an error code + """ default_site = get_default_site() try: p = Pipeline.create(pipe_config) @@ -46,10 +71,28 @@ def run_pipeline(pipe_config): return status -def run_postscript(post_script=None, dry_run=False, script_args=[]): +def run_postscript(post_script=None, dry_run=False, script_args=None): + """ Run a script after the pipeline finishes + + Parameters + ---------- + post_script : `str` + The script to run + dry_run : `bool` + If true do not run the script + script_args : `list`, (`str`) + Arguments to the script + + Returns + ------- + return_code : `int` + Usual unix convention of 0 -> success, non-zero is an error code + """ + if script_args is None: #pragma: no cover + script_args = [] if post_script and not dry_run: return_code = subprocess.call(post_script.split() + script_args, shell=True) - if return_code: + if return_code: #pragma: no cover sys.stderr.write( f"\nWARNING: The post-script command {post_script} " "returned error status {return_code}\n\n" @@ -60,13 +103,31 @@ def run_postscript(post_script=None, dry_run=False, script_args=[]): def run(pipe_config, pipeline_config_filename, extra_config=None, dry_run=False): + """ Run a pipeline and associated scripts + + Parameters + ---------- + pipe_config : `dict` + The configuration dictionary + pipe_config_filename : `str` + The yaml file with the pipeline configuration + extra_config : `dict` + Extra parameters to override configuration + dry_run : `bool` + Flag to not actually run jobs + + Returns + ------- + status : `int` + Usual unix convention of 0 -> success, non-zero is an error code + """ # Later we will add these paths to sys.path for running here, # but we will also need to pass them to the sites below so that # they can be added within any containers or other launchers # that we use paths = pipe_config.get("python_paths", []) - if isinstance(paths, str): + if isinstance(paths, str): #pragma: no cover paths = paths.split() launcher_config = pipe_config.setdefault("launcher", {"name": "mini"}) @@ -101,7 +162,7 @@ def run(pipe_config, pipeline_config_filename, extra_config=None, dry_run=False) return status -def main(): +def main(): #pragma: no cover args = parser.parse_args() pipe_config = Pipeline.build_config(args.pipeline_config_filename, args.extra_config, args.dry_run) status = run(pipe_config, args.pipeline_config_filename, args.extra_config, args.dry_run) @@ -113,5 +174,5 @@ def main(): return status -if __name__ == "__main__": +if __name__ == "__main__": #pragma: no cover sys.exit(main()) diff --git a/ceci/minirunner.py b/ceci/minirunner.py index f5423ff..0c1a0c5 100644 --- a/ceci/minirunner.py +++ b/ceci/minirunner.py @@ -79,7 +79,7 @@ def __str__(self): __repr__ = __str__ - def __hash__(self): + def __hash__(self): #pragma: no cover return hash(self.id) def assign(self): @@ -294,7 +294,7 @@ def _check_impossible(self): f"Job {job} cannot be run - it needs {job.nodes}" f" nodes but only {n_node} is/are available" ) - if job.cores > n_core: + if job.cores > n_core: #pragma: no cover raise CannotRun( f"Job {job} cannot be run - it needs {job.cores}" f" cores but only {n_core} is/are available" @@ -335,7 +335,7 @@ def _check_completed(self): # check status status = process.poll() # None indicates job is still running - if status is None: + if status is None: #pragma: no cover continuing_jobs.append((process, job, alloc)) # status !=0 indicates error in job. # kill everything diff --git a/ceci/pipeline.py b/ceci/pipeline.py index 4a7190c..a559008 100644 --- a/ceci/pipeline.py +++ b/ceci/pipeline.py @@ -63,6 +63,7 @@ def __init__(self, info): # Core attributes - mandatory self.name = info["name"] + self.class_name = info.get('classname', self.name) self.site = info.get("site", get_default_site()) # Parallelism attributes - optional @@ -83,44 +84,41 @@ def __init__(self, info): @classmethod def create(cls, stage, **kwargs): info = kwargs.copy() - info['name'] = stage.name + info['name'] = stage.instance_name + info['classname'] = stage.name sec = cls(info) sec.set_stage_obj(stage) return sec - + def set_stage_obj(self, stage_obj): - self.stage_class = PipelineStage.get_stage(self.name) - if not isinstance(stage_obj, self.stage_class): + self.stage_class = PipelineStage.get_stage(self.class_name) + if not isinstance(stage_obj, self.stage_class): #pragma: no cover raise TypeError(f"{str(stage_obj)} is not a {str(self.stage_class)}") self.stage_obj = stage_obj - + def build_stage_class(self): - self.stage_class = PipelineStage.get_stage(self.name) + self.stage_class = PipelineStage.get_stage(self.class_name) return self.stage_class def build_stage_object(self, args): - if self.stage_class is None: - self.stage_class = PipelineStage.get_stage(self.name) + if self.stage_class is None: #pragma: no cover + self.stage_class = PipelineStage.get_stage(self.class_name) self.stage_obj = self.stage_class(args) return self.stage_obj - def clear_stage_object(self): - self.stage_obj = None - def generate_full_command(self, inputs, outputs, config): if self.stage_obj is not None: aliases = self.stage_obj.get_aliases() else: - aliases = None + aliases = None #pragma: no cover if self.stage_class is None: - self.build_stage_class() + self.build_stage_class() #pragma: no cover core = self.stage_class.generate_command(inputs, config, outputs, aliases) return self.site.command(core, self) class FileManager(dict): - """ - """ + def __init__(self): self._tag_to_type = {} self._path_to_tag = {} @@ -129,14 +127,14 @@ def __init__(self): def __setitem__(self, tag, path): dict.__setitem__(self, tag, path) self._path_to_tag[path] = tag - + def insert(self, tag, path=None, ftype=None): if path is not None: self[tag] = path self._path_to_tag[path] = tag - if ftype is not None: + if tag not in self._tag_to_type: self._tag_to_type[tag] = ftype - + def get_type(self, tag): return self._tag_to_type[tag] @@ -144,12 +142,20 @@ def get_path(self, tag): return self[tag] def get_tag(self, path): - return self._path_to_tag + return self._path_to_tag[path] def insert_paths(self, path_dict): for key, val in path_dict.items(): self.insert(key, path=val) + def insert_outputs(self, stage, outdir): + stage_outputs = stage.find_outputs(outdir) + for tag, ftype in stage.outputs: + aliased_tag = stage.get_aliased_tag(tag) + path = stage_outputs[aliased_tag] + self.insert(aliased_tag, path=path, ftype=ftype) + return stage_outputs + class Pipeline: """ @@ -213,7 +219,7 @@ def create(pipe_config): pipeline_class = DryRunPipeline try: pipeline_class = launcher_dict[launcher_name] - except KeyError as msg: + except KeyError as msg: #pragma: no cover raise KeyError(f"Unknown pipeline launcher {launcher_name}, options are {list(launcher_dict.keys())}") from msg p = pipeline_class(stages, launcher_config) @@ -225,7 +231,7 @@ def interactive(): launcher_config = dict(name="mini") return MiniPipeline([], launcher_config) - + @staticmethod def build_config(pipeline_config_filename, extra_config=None, dry_run=False): # YAML input file. @@ -244,24 +250,25 @@ def build_config(pipeline_config_filename, extra_config=None, dry_run=False): # Launchers may need to know if this is a dry-run launcher_config["dry_run"] = dry_run + pipe_config["dry_run"] = dry_run return pipe_config def __getattr__(self, name): try: return self.stage_execution_config[name].stage_obj - except Exception as msg: + except Exception as msg: #pragma: no cover raise AttributeError(f"Pipeline does not have stage {name}") from msg def print_stages(self, stream=sys.stdout): for stage in self.stages: - stream.write(f"{stage.name:20}: {str(stage)}") + stream.write(f"{stage.instance_name:20}: {str(stage)}") stream.write("\n") @staticmethod def read(pipeline_config_filename, extra_config=None, dry_run=False): pipe_config = Pipeline.build_config(pipeline_config_filename, extra_config, dry_run) paths = pipe_config.get("python_paths", []) - if isinstance(paths, str): + if isinstance(paths, str): #pragma: no cover paths = paths.split() modules = pipe_config["modules"].split() @@ -319,14 +326,36 @@ def add_stage(self, stage_info): sec = StageExecutionConfig(stage_info) self.stage_execution_config[sec.name] = sec self.stage_names.append(sec.name) - if sec.stage_obj: - return sec.stage_obj.find_outputs('.') - return {} + if sec.stage_obj is None: + return {} + return self.pipeline_files.insert_outputs(sec.stage_obj, '.') + + def build_stage(self, stage_class, **kwargs): + """Build a stage and add it to the pipeline + + Parameters + ---------- + stage_class: type + A subtype of `PipelineStage`, the class of the stage being build + + Returns + ------- + stage_outputs: `dict` + The names of the output files + - def build_stage(self, stage_class, args): - stage = stage_class(**args, **self.pipeline_files.todict()) + Notes + ----- + The keyword arguemets will be based to the `stage_class` constructor + + The output files produced by this stage will be added to the + `Pipeline.pipeline_files` data member, so that they are available to later stages + """ + kwcopy = kwargs.copy() + kwcopy.update(**self.pipeline_files) + stage = stage_class(kwcopy) return self.add_stage(stage) - + def remove_stage(self, name): """Delete a stage from the pipeline @@ -386,7 +415,7 @@ def ordered_stages(self, overall_inputs, stages_config=None): for tag in stage.output_tags(): if tag in overall_inputs: raise ValueError( - f"Pipeline stage {stage.name} " + f"Pipeline stage {stage.instance_name} " f"generates output {tag}, but " "it is already an overall input" ) @@ -429,7 +458,7 @@ def ordered_stages(self, overall_inputs, stages_config=None): stage = sec.build_stage_object(stage_config) else: stage = sec.stage_obj - + # for file that stage produces, for tag in stage.output_tags(): # find all the next_stages that depend on that file @@ -457,7 +486,7 @@ def ordered_stages(self, overall_inputs, stages_config=None): tag for tag in stage.input_tags() if tag not in found_inputs ] missing_inputs = ", ".join(missing_inputs) - msg1.append(f"Stage {stage.name} is missing input(s): {missing_inputs}") + msg1.append(f"Stage {stage.instance_name} is missing input(s): {missing_inputs}") msg1 = "\n".join(msg1) raise ValueError( @@ -471,9 +500,6 @@ def ordered_stages(self, overall_inputs, stages_config=None): return ordered_stages - def load_inputs(self, overall_inputs): - return self.load_configs(overall_inputs, {'output_dir':'.', 'log_dir':'.'}, None) - def load_configs(self, overall_inputs, run_config, stages_config): # Make a copy, since we'll be modifying this. self.pipeline_files.insert_paths(overall_inputs) @@ -483,7 +509,7 @@ def load_configs(self, overall_inputs, run_config, stages_config): if self.stages_config is not None: with open(self.stages_config) as stage_config_file: self.stage_config_data = yaml.safe_load(stage_config_file) - else: + else: #pragma: no cover self.stage_config_data = {} global_config = self.stage_config_data.pop('global', {}) for v in self.stage_config_data.values(): @@ -510,8 +536,7 @@ def load_configs(self, overall_inputs, run_config, stages_config): if self.should_skip_stage(stage): stage.already_finished() - output_paths = stage.find_outputs(run_config['output_dir']) - self.pipeline_files.insert_paths(output_paths) + self.pipeline_files.insert_outputs(stage, run_config['output_dir']) # Otherwise, run the pipeline and register any outputs from the # pipe element. @@ -535,15 +560,15 @@ def find_all_outputs(self): return outputs @abstractmethod - def initiate_run(self, overall_inputs): + def initiate_run(self, overall_inputs): #pragma: no cover raise NotImplementedError() @abstractmethod - def enqueue_job(self, stage, pipeline_files): + def enqueue_job(self, stage, pipeline_files): #pragma: no cover raise NotImplementedError() @abstractmethod - def run_jobs(self): + def run_jobs(self): #pragma: no cover raise NotImplementedError() def should_skip_stage(self, stage): @@ -567,7 +592,7 @@ def should_skip_stage(self, stage): def enqueue_job(self, stage, pipeline_files): outputs = stage.find_outputs(self.run_config['output_dir']) - sec = self.stage_execution_config[stage.name] + sec = self.stage_execution_config[stage.instance_name] cmd = sec.generate_full_command(pipeline_files, outputs, self.stages_config) @@ -575,7 +600,7 @@ def enqueue_job(self, stage, pipeline_files): # text, but only if we are printing to screen. This helps the # eye pick out the stage you want to run. if sys.stdout.isatty(): - cmd = cmd.replace(stage.name, embolden(stage.name), 1) + cmd = cmd.replace(stage.instance_name, embolden(stage.instance_name), 1) self.run_info.append(cmd) return outputs @@ -619,7 +644,7 @@ def enqueue_job(self, stage, pipeline_files): # have parsl queue the app future = app(inputs=inputs, outputs=outputs) - self.run_info.append((stage.name, future)) + self.run_info.append((stage.instance_name, future)) return {tag: future.outputs[i] for i, tag in enumerate(stage.output_tags())} def run_jobs(self): @@ -652,11 +677,11 @@ def run_jobs(self): if os.path.exists(stdout_file): with open(stdout_file) as _stdout: sys.stderr.write(_stdout.read()) - else: + else: #pragma: no cover sys.stderr.write("STDOUT MISSING!\n\n") sys.stderr.write( - f""" + """ ************************************************* Standard error: @@ -668,7 +693,7 @@ def run_jobs(self): if os.path.exists(stderr_file): with open(stderr_file) as _stderr: sys.stderr.write(_stderr.read()) - else: + else: #pragma: no cover sys.stderr.write("STDERR MISSING!\n\n") return 1 return 0 @@ -720,7 +745,7 @@ def generate_app(self, stage, run_config): config = f"{{inputs[{config_index}]}}" # This includes all the "mpirun" stuff. - sec = self.stage_execution_config[stage.name] + sec = self.stage_execution_config[stage.instance_name] executor = sec.site.info["executor"] # Construct the command line call @@ -733,10 +758,10 @@ def generate_app(self, stage, run_config): # we build and exec a string. template = f""" @parsl.app.app.bash_app(executors=[executor]) -def {stage.name}(inputs, outputs, stdout='{log_dir}/{stage.name}.out', stderr='{log_dir}/{stage.name}.err'): +def {stage.instance_name}(inputs, outputs, stdout='{log_dir}/{stage.instance_name}.out', stderr='{log_dir}/{stage.instance_name}.err'): cmd = '{cmd1}'.format(inputs=inputs, outputs=outputs) print("Launching command:") - print(cmd, " 2> {log_dir}/{stage.name}.err 1> {log_dir}/{stage.name}.out") + print(cmd, " 2> {log_dir}/{stage.instance_name}.err 1> {log_dir}/{stage.instance_name}.out") return cmd """ print(template) @@ -746,7 +771,7 @@ def {stage.name}(inputs, outputs, stdout='{log_dir}/{stage.name}.out', stderr='{ exec(template, {"parsl": parsl}, d) #pylint: disable=exec-used # Return the function itself. - return d[stage.name] + return d[stage.instance_name] class MiniPipeline(Pipeline): @@ -805,10 +830,10 @@ def build_dag(self, jobs): # for each stage in our pipeline ... for stage in self.stages[:]: - if stage.name not in jobs: + if stage.instance_name not in jobs: continue - depend[jobs[stage.name]] = [] - job = jobs[stage.name] + depend[jobs[stage.instance_name]] = [] + job = jobs[stage.instance_name] # check for each of the inputs for that stage ... for tag in stage.input_tags(): for potential_parent in self.stages[:]: @@ -823,16 +848,16 @@ def initiate_run(self, overall_inputs): return jobs, stages def enqueue_job(self, stage, pipeline_files): - sec = self.stage_execution_config[stage.name] + sec = self.stage_execution_config[stage.instance_name] outputs = stage.find_outputs(self.run_config['output_dir']) cmd = sec.generate_full_command(pipeline_files, outputs, self.stages_config) job = minirunner.Job( - stage.name, + stage.instance_name, cmd, cores=sec.threads_per_process * sec.nprocess, nodes=sec.nodes, ) - self.run_info[0][stage.name] = job + self.run_info[0][stage.instance_name] = job self.run_info[1].append(stage) return outputs @@ -907,13 +932,13 @@ def make_inputs_file(stages, overall_inputs, stages_config, inputs_file): # is set in the config file for stage in stages: # There might be nothing if no options are needed. - this_stage_config = stage_config_data.get(stage.name, {}) + this_stage_config = stage_config_data.get(stage.instance_name, {}) # Record only keys that have been set. If any are missing # it is an error that will be noticed later. for key in stage.config_options: val = this_stage_config.get(key, global_config.get(key)) if val is not None: - inputs[f"{key}@{stage.name}"] = val + inputs[f"{key}@{stage.instance_name}"] = val inputs["config"] = { "class": "File", @@ -961,10 +986,10 @@ def enqueue_job(self, stage, pipeline_files): # Create a CWL representation of this step cwl_tool = stage.generate_cwl(log_dir) - cwl_tool.export(f"{cwl_dir}/{stage.name}.cwl") + cwl_tool.export(f"{cwl_dir}/{stage.instance_name}.cwl") # Load that representation again and add it to the pipeline - step = WorkflowStep(stage.name, run=f"{cwl_tool.id}.cwl") + step = WorkflowStep(stage.instance_name, run=f"{cwl_tool.id}.cwl") # For CWL these inputs are a mix of file and config inputs, # so not he same as the pipeline_files we usually see diff --git a/ceci/sites/__init__.py b/ceci/sites/__init__.py index 6577d92..b113b58 100644 --- a/ceci/sites/__init__.py +++ b/ceci/sites/__init__.py @@ -73,7 +73,7 @@ def load(launcher_config, site_configs): try: cls = site_classes[site_name] - except KeyError as msg: + except KeyError as msg: #pragma: no cover raise ValueError(f"Unknown site {site_name}") from msg site = cls(site_config) @@ -117,7 +117,7 @@ def setup_parsl(launcher_config, sites): # Optional logging of pipeline infrastructure to file. log_file = launcher_config.get("log") - if log_file: + if log_file: #pragma: no cover log_file_dir = os.path.split(os.path.abspath(log_file))[0] os.makedirs(log_file_dir, exist_ok=True) set_file_logger(log_file) diff --git a/ceci/sites/ccin2p3.py b/ceci/sites/ccin2p3.py index ac3d5f1..0cd18af 100644 --- a/ceci/sites/ccin2p3.py +++ b/ceci/sites/ccin2p3.py @@ -27,7 +27,7 @@ def command(self, cmd, sec): """ mpi1 = f"{self.mpi_command} {sec.nprocess} " - mpi2 = f"--mpi" if sec.nprocess > 1 else "" + mpi2 = "--mpi" if sec.nprocess > 1 else "" volume_flag = f"--bind {sec.volume} " if sec.volume else "" paths = self.config.get("python_paths", []) diff --git a/ceci/sites/cori.py b/ceci/sites/cori.py index a17db92..f20262d 100644 --- a/ceci/sites/cori.py +++ b/ceci/sites/cori.py @@ -29,7 +29,7 @@ def command(self, cmd, sec): # on cori we always use srun, even if the command is a single process mpi1 = f"{self.mpi_command} {sec.nprocess} --cpus-per-task={sec.threads_per_process}" - mpi2 = f"--mpi" if sec.nprocess > 1 else "" + mpi2 = "--mpi" if sec.nprocess > 1 else "" volume_flag = f"-V {sec.volume} " if sec.volume else "" paths = self.config.get("python_paths", []) @@ -61,7 +61,7 @@ def command(self, cmd, sec): paths_end = "'" if paths else "" return ( f"{mpi1} " - f"shifter " + "shifter " f"--env OMP_NUM_THREADS={sec.threads_per_process} " f"{volume_flag} " f"--image {sec.image} " diff --git a/ceci/sites/local.py b/ceci/sites/local.py index 686357d..33d0ab1 100644 --- a/ceci/sites/local.py +++ b/ceci/sites/local.py @@ -27,7 +27,7 @@ def command(self, cmd, sec): """ mpi1 = f"{self.mpi_command} {sec.nprocess}" if sec.nprocess > 1 else "" - mpi2 = f"--mpi" if sec.nprocess > 1 else "" + mpi2 = "--mpi" if sec.nprocess > 1 else "" volume_flag = f"-v {sec.volume} " if sec.volume else "" paths = self.config.get("python_paths", []) diff --git a/ceci/sites/site.py b/ceci/sites/site.py index af541cf..95e5ad8 100644 --- a/ceci/sites/site.py +++ b/ceci/sites/site.py @@ -2,7 +2,6 @@ class Site: - """""" default_mpi_command = "mpirun -n" @@ -17,7 +16,7 @@ def check_import(self, launcher): #pylint: disable=no-self-use "cwl": ["cwlgen", "cwltool"], "mini": ["psutil"], } - if launcher not in requirements: + if launcher not in requirements: #pragma: no cover raise ValueError(f"Unknown launcher '{launcher}'") missing = [] libs = requirements[launcher] @@ -26,9 +25,9 @@ def check_import(self, launcher): #pylint: disable=no-self-use with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) __import__(lib) - except ImportError: + except ImportError: #pragma: no cover missing.append(lib) - if missing: + if missing: #pragma: no cover missing = ", ".join(missing) raise ImportError( f"You must install these libraries " @@ -38,7 +37,7 @@ def check_import(self, launcher): #pylint: disable=no-self-use def configure_for_launcher(self, launcher): self.check_import(launcher) configure = getattr(self, f"configure_for_{launcher}", None) - if configure is None: + if configure is None: #pragma: no cover raise ValueError( f"Site {self} does not know how to configure for launcher {launcher}" ) diff --git a/ceci/stage.py b/ceci/stage.py index a579c72..e45809e 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -20,7 +20,7 @@ class PipelineStage: """A PipelineStage implements a single calculation step within a wider pipeline. - Each different type of analysis stge is represented by a subclass of this + Each different type of analysis stage is represented by a subclass of this base class. The base class handles the connection between different pipeline stages, and the execution of the stages within a workflow system (parsl), potentially in parallel (MPI). @@ -81,7 +81,6 @@ def __init__(self, args, comm=None): args: dict or namespace Specification of input and output paths and any missing config options """ - self._name = self.name self._configs = StageConfig(**self.config_options) self._inputs = None self._outputs = None @@ -96,36 +95,32 @@ def __init__(self, args, comm=None): self.setup_mpi(comm) def get_aliases(self): + """ Returns the dictionary of aliases used to remap inputs and outputs + in the case that we want to have multiple instance of this class in the pipeline """ return self.config.get('aliases', None) def get_aliased_tag(self, tag): - aliases = self.config.get('aliases', None) + """ Returns the possibly remapped value for an input or output tag + + Parameter + --------- + tag : `str` + The input or output tag we are checking + + Returns + ------- + aliased_tag : `str` + The aliases version of the tag + """ + aliases = self.get_aliases() if aliases is None: return tag return aliases.get(tag, tag) - @classmethod - def build(cls, **kwargs): - kwcopy = kwargs.copy() - return cls(kwcopy) - - @classmethod - def clone(cls, orig, cloneName, **kwargs): - args = orig.config.copy() - args.update(**kwargs) - args['name'] = cloneName - return cls(args) - @abstractmethod - def run(self): + def run(self): #pragma: no cover raise NotImplementedError('run') - def get_aliased_tag(self, tag): - aliases = self.config.get('aliases', None) - if aliases is None: - return tag - return aliases.get(tag, tag) - def load_configs(self, args): """ Load the configuraiton @@ -138,16 +133,24 @@ def load_configs(self, args): if not isinstance(args, dict): args = vars(args) + + # First, we extract configuration information from a combination of + # command line arguments and optional 'config' file + self._inputs = dict(config=args["config"]) + self.read_config(args) + # We first check for missing input files, that's a show stopper missing_inputs = [] for x in self.input_tags(): - try: - val = args[x] - except KeyError as msg: - raise ValueError(f"{x} missing from {list(args.keys())}") from msg + val = args.get(x) + aliased_tag = self.get_aliased_tag(x) if val is None: + val = args.get(aliased_tag) + if val is None: #pragma: no cover missing_inputs.append(f"--{x}") - if missing_inputs: + else: + self._inputs[aliased_tag] = val + if missing_inputs: #pragma: no cover missing_inputs = " ".join(missing_inputs) raise ValueError( f""" @@ -156,13 +159,10 @@ def load_configs(self, args): Input names: {missing_inputs}""" ) - self._inputs = {self.get_aliased_tag(x): args[x] for x in self.input_tags()} # We alwys assume the config arg exists, whether it is in input_tags or not - if 'config' not in args: + if 'config' not in args: #pragma: no cover raise ValueError("The argument --config was missing on the command line.") - self._inputs["config"] = args["config"] - # We prefer to receive explicit filenames for the outputs but will # tolerate missing output filenames and will default to tag name in # current folder (this is for CWL compliance) @@ -174,9 +174,6 @@ def load_configs(self, args): else: self._outputs[self.get_aliased_tag(x)] = args[x] - # Finally, we extract configuration information from a combination of - # command line arguments and optional 'config' file - self.read_config(args) def setup_mpi(self, comm=None): """ @@ -189,7 +186,7 @@ def setup_mpi(self, comm=None): """ use_mpi = self.config.get('use_mpi', False) - if use_mpi: + if use_mpi: #pragma: no cover try: # This isn't a ceci dependency, so give a sensible error message if not installed. import mpi4py.MPI @@ -205,7 +202,7 @@ def setup_mpi(self, comm=None): self._comm = comm self._size = self._comm.Get_size() self._rank = self._comm.Get_rank() - elif use_mpi: + elif use_mpi: #pragma: no cover self._parallel = MPI_PARALLEL self._comm = mpi4py.MPI.COMM_WORLD self._size = self._comm.Get_size() @@ -246,7 +243,7 @@ def __init_subclass__(cls, **kwargs): # If there isn't an explicit name already then set it here. # by default use the class name. - if not hasattr(cls, "name"): + if not hasattr(cls, "name"): #pragma: no cover cls.name = cls.__name__ if cls.name is None: cls.name = cls.__name__ @@ -291,10 +288,6 @@ def __init_subclass__(cls, **kwargs): else: cls.incomplete_pipeline_stages[cls.__name__] = (cls, path) - def config_and_run(self, **kwargs): - self.load_configs(kwargs) - self.run() - ############################################# # Life cycle-related methods and properties. ############################################# @@ -348,7 +341,7 @@ def get_module(cls): return cls.pipeline_stages[cls.name][0].__module__ @classmethod - def usage(cls): + def usage(cls): #pragma: no cover """ Print a usage message. """ @@ -377,10 +370,10 @@ def main(cls): """ try: stage_name = sys.argv[1] - except IndexError: + except IndexError: #pragma: no cover cls.usage() return 1 - if stage_name in ["--help", "-h"] and len(sys.argv) == 2: + if stage_name in ["--help", "-h"] and len(sys.argv) == 2: #pragma: no cover cls.usage() return 1 stage = cls.get_stage(stage_name) @@ -402,11 +395,11 @@ def parse_command_line(cls, cmd=None): parser.add_argument(f"--no-{conf}", dest=conf, action="store_const", const=False) elif opt_type == list: out_type = def_val[0] if isinstance(def_val[0], type) else type(def_val[0]) - if out_type is str: + if out_type is str: #pragma: no cover parser.add_argument( f"--{conf}", type=lambda string: string.split(",") ) - elif out_type is int: + elif out_type is int: #pragma: no cover parser.add_argument( f"--{conf}", type=lambda string: [int(i) for i in string.split(",")], @@ -416,11 +409,11 @@ def parse_command_line(cls, cmd=None): f"--{conf}", type=lambda string: [float(i) for i in string.split(",")], ) - else: + else: #pragma: no cover raise NotImplementedError( "Only handles str, int and float list arguments" ) - else: + else: #pragma: no cover parser.add_argument(f"--{conf}", type=opt_type) for inp in cls.input_tags(): parser.add_argument(f"--{inp}") @@ -488,16 +481,16 @@ def execute(cls, args, comm=None): if not is_client: return - if args.cprofile: + if args.cprofile: #pragma: no cover profile = cProfile.Profile() profile.enable() - if args.memmon: + if args.memmon: #pragma: no cover monitor = MemoryMonitor.start_in_thread(interval=args.memmon) try: stage.run() - except Exception as error: + except Exception as error: #pragma: no cover if args.pdb: print( "There was an exception - starting python debugger because you ran with --pdb" @@ -507,7 +500,7 @@ def execute(cls, args, comm=None): else: raise finally: - if args.memmon: + if args.memmon: #pragma: no cover monitor.stop() if stage.is_dask(): stage.stop_dask() @@ -516,7 +509,7 @@ def execute(cls, args, comm=None): # final location, but subclasses can override to do other things too try: stage.finalize() - except Exception as error: + except Exception as error: #pragma: no cover if args.pdb: print( "There was an exception in the finalization - starting python debugger because you ran with --pdb" @@ -525,7 +518,7 @@ def execute(cls, args, comm=None): pdb.post_mortem() else: raise - if args.cprofile: + if args.cprofile: #pragma: no cover profile.disable() profile.dump_stats(args.cprofile) profile.print_stats("cumtime") @@ -540,7 +533,7 @@ def execute(cls, args, comm=None): def finalize(self): """Finalize the stage, moving all its outputs to their final locations.""" # Synchronize files so that everything is closed - if self.is_mpi(): + if self.is_mpi(): #pragma: no cover self.comm.Barrier() # Move files to their final path @@ -557,10 +550,10 @@ def finalize(self): # because that will be handled later. if pathlib.Path(temp_name).exists(): # replace directories, rather than nesting more results - if pathlib.Path(final_name).is_dir(): + if pathlib.Path(final_name).is_dir(): #pragma: no cover shutil.rmtree(final_name) shutil.move(temp_name, final_name) - else: + else: #pragma: no cover sys.stderr.write( f"NOTE/WARNING: Expected output file {final_name} was not generated.\n" ) @@ -621,14 +614,14 @@ def start_dask(self): import dask import dask_mpi import dask.distributed - except ImportError: + except ImportError: #pragma: no cover print( "ERROR: Using --mpi option on stages that use dask requires " "dask[distributed] and dask_mpi to be installed." ) raise - if self.size < 3: + if self.size < 3: #pragma: no cover raise ValueError( "Dask requires at least three processes. One becomes a scheduler " "process, one is a client that runs the code, and more are required " @@ -694,7 +687,7 @@ def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True): Default=True """ n_chunks = n_rows // chunk_rows - if n_chunks * chunk_rows < n_rows: + if n_chunks * chunk_rows < n_rows: #pragma: no cover n_chunks += 1 if parallel: it = self.split_tasks_by_rank(range(n_chunks)) @@ -743,11 +736,11 @@ def open_input(self, tag, wrapper=False, **kwargs): input_class = self.get_input_type(tag) obj = input_class(path, "r", **kwargs) - if wrapper: + if wrapper: #pragma: no cover return obj return obj.file - def open_output(self, tag, wrapper=False, final_name=False, **kwargs): + def open_output(self, tag, wrapper=False, final_name=False, **kwargs): #pragma: no cover """ Find and open an output file with the given tag, in write mode. @@ -836,14 +829,14 @@ def get_input_type(self, tag): for t, dt in self.inputs: if t == tag: return dt - raise ValueError(f"Tag {tag} is not a known input") + raise ValueError(f"Tag {tag} is not a known input") #pragma: no cover def get_output_type(self, tag): """Return the file type class of an output file with the given tag.""" for t, dt in self.outputs: if t == tag: return dt - raise ValueError(f"Tag {tag} is not a known output") + raise ValueError(f"Tag {tag} is not a known output") #pragma: no cover ################################################## # Configuration-related methods and properties. @@ -851,7 +844,7 @@ def get_output_type(self, tag): @property def instance_name(self): - return self._name + return self._configs.get('name', self.name) @property def config(self): @@ -896,69 +889,10 @@ def read_config(self, args): # This is just the config info in the file for this stage. # It may be incomplete - there may be things specified on the # command line instead, or just using their default values - stage_config = overall_config.get(self.name, {}) + stage_config = overall_config.get(self.instance_name, {}) input_config.update(stage_config) self._configs.set_config(input_config, args) - - # Here we build up the actual configuration we use on this - # run from all these sources - my_config = {} - - # Loop over the options of the pipeline stage - #for x, opt_val in self.config_options.items(): - # opt = None - # opt_type = None - - # First look for a default value, - # if a type (like int) is provided as the default it indicates that - # this option doesn't have a default (i.e. is mandatory) and should - # be explicitly provided with the specified type - #if isinstance(opt_val, type): - # opt_type = opt_val - - #elif isinstance(opt_val, list): - # v = opt_val[0] - # if isinstance(v, type): - # opt_type = v - # else: - # opt = opt_val - # opt_type = type(v) - #else: - # opt = opt_val - # opt_type = type(opt) - - # Second, look for the option in the configuration file and override - # default if provided TODO: Check types - #if x in input_config: - # opt = input_config[x] - # _ = opt_type # This is just to get pylint to shut up - - # Finally check for command line option that would override the value - # in the configuration file. Note that the argument parser should - # already have taken care of type - #if args.get(x) is not None: - # opt = args[x] - - # Finally, check that we got at least some value for this option - #if opt is None: - # raise ValueError( - # f"Missing configuration option {x} for stage {self.name}" - # ) - - #my_config[x] = opt - - # Unspecified parameters can also be copied over. - # This will be needed for parameters that are more complicated, such - # as dictionaries or other more structured parameter information. - #for x, val in input_config.items(): - # Omit things we've already dealt with above - # if x in self.config_options: - # continue - # copy over everything else - # my_config[x] = val - - #return my_config def find_inputs(self, pipeline_files): ret_dict = {} @@ -977,12 +911,12 @@ def find_outputs(self, outdir): def print_io(self, stream=sys.stdout): stream.write("Inputs--------\n") for tag, ftype in self.inputs: - aliased_tag - self.get_aliased_tag(tag) + aliased_tag = self.get_aliased_tag(tag) stream.write(f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._inputs[tag]}\n") stream.write("Outputs--------\n") for tag, ftype in self.outputs: - aliased_tag - self.get_aliased_tag(tag) - stream.write(f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._outputs[tag]}\n") + aliased_tag = self.get_aliased_tag(tag) + stream.write(f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._outputs[aliased_tag]}\n") def should_skip(self, run_config): outputs = self.find_outputs(run_config["output_dir"]).values() @@ -990,9 +924,9 @@ def should_skip(self, run_config): return already_run_stage and run_config["resume"] def already_finished(self): - print(f"Skipping stage {self._name} because its outputs exist already") + print(f"Skipping stage {self.instance_name} because its outputs exist already") - def iterate_fits(self, tag, hdunum, cols, chunk_rows, parallel=True): + def iterate_fits(self, tag, hdunum, cols, chunk_rows, parallel=True): #pragma: no cover """ Loop through chunks of the input data from a FITS file with the given tag @@ -1106,7 +1040,7 @@ def generate_command(cls, inputs, config, outputs, aliases=None): aliased_tag = tag try: fpath = inputs[aliased_tag] - except KeyError as msg: + except KeyError as msg: #pragma: no cover raise ValueError(f"Missing input location {aliased_tag} {str(inputs)}") from msg flags.append(f"--{tag}={fpath}") @@ -1119,7 +1053,7 @@ def generate_command(cls, inputs, config, outputs, aliases=None): aliased_tag = tag try: fpath = outputs[aliased_tag] - except KeyError as msg: + except KeyError as msg: #pragma: no cover raise ValueError(f"Missing output location {aliased_tag} {str(outputs)}") from msg flags.append(f"--{tag}={fpath}") diff --git a/do_cover.sh b/do_cover.sh new file mode 100755 index 0000000..acf85fa --- /dev/null +++ b/do_cover.sh @@ -0,0 +1 @@ +python -m pytest --cov=./ceci --cov-report=html tests diff --git a/nb/ceci_interactive_example.ipynb b/nb/ceci_interactive_example.ipynb index e79aaa2..3de8df7 100644 --- a/nb/ceci_interactive_example.ipynb +++ b/nb/ceci_interactive_example.ipynb @@ -111,12 +111,12 @@ "metadata": {}, "outputs": [], "source": [ - "status = pipeline.run()" + "#status = pipeline.run()" ] }, { "cell_type": "markdown", - "id": "90bd832e", + "id": "a3ace45a", "metadata": {}, "source": [ "#### Now let's do the same thing by building the pipeline with the python interface" @@ -125,7 +125,7 @@ { "cell_type": "code", "execution_count": null, - "id": "74f69a12", + "id": "2790e6cb", "metadata": {}, "outputs": [], "source": [ @@ -135,7 +135,32 @@ { "cell_type": "code", "execution_count": null, - "id": "4485aba5", + "id": "d63492d8", + "metadata": {}, + "outputs": [], + "source": [ + "#pipe2 = Pipeline.interactive()\n", + "#overall_inputs = {'DM':'./tests/inputs/dm.txt',\n", + "# 'fiducial_cosmology':'./tests/inputs/fiducial_cosmology.txt'}\n", + "#inputs = overall_inputs.copy()\n", + "#inputs['metacalibration'] = True\n", + "#inputs['config'] = None\n", + "\n", + "#inputs.update(pipe2.add_stage(PZEstimationPipe.build(**inputs)))\n", + "#inputs.update(pipe2.add_stage(shearMeasurementPipe.build(**inputs, apply_flag=False)))\n", + "#inputs.update(pipe2.add_stage(WLGCSelector.build(**inputs, zbin_edges=[0.2, 0.3, 0.5], ra_range=[-5, 5])))\n", + "#inputs.update(pipe2.add_stage(SysMapMaker.build(**inputs)))\n", + "#inputs.update(pipe2.add_stage(SourceSummarizer.build(**inputs)))\n", + "#inputs.update(pipe2.add_stage(WLGCCov.build(**inputs)))\n", + "#inputs.update(pipe2.add_stage(WLGCRandoms.build(**inputs)))\n", + "#inputs.update(pipe2.add_stage(WLGCTwoPoint.build(**inputs)))\n", + "#inputs.update(pipe2.add_stage(WLGCSummaryStatistic.build(**inputs)))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13669ff1", "metadata": {}, "outputs": [], "source": [ @@ -146,21 +171,22 @@ "inputs['metacalibration'] = True\n", "inputs['config'] = None\n", "\n", - "inputs.update(pipe2.add_stage(PZEstimationPipe.build(**inputs)))\n", - "inputs.update(pipe2.add_stage(shearMeasurementPipe.build(**inputs, apply_flag=False)))\n", - "inputs.update(pipe2.add_stage(WLGCSelector.build(**inputs, zbin_edges=[0.2, 0.3, 0.5], ra_range=[-5, 5])))\n", - "inputs.update(pipe2.add_stage(SysMapMaker.build(**inputs)))\n", - "inputs.update(pipe2.add_stage(SourceSummarizer.build(**inputs)))\n", - "inputs.update(pipe2.add_stage(WLGCCov.build(**inputs)))\n", - "inputs.update(pipe2.add_stage(WLGCRandoms.build(**inputs)))\n", - "inputs.update(pipe2.add_stage(WLGCTwoPoint.build(**inputs)))\n", - "inputs.update(pipe2.add_stage(WLGCSummaryStatistic.build(**inputs)))\n" + "pipe2.pipeline_files.update(**inputs)\n", + "pipe2.build_stage(PZEstimationPipe)\n", + "pipe2.build_stage(shearMeasurementPipe, apply_flag=False)\n", + "pipe2.build_stage(WLGCSelector, zbin_edges=[0.2, 0.3, 0.5], ra_range=[-5, 5])\n", + "pipe2.build_stage(SysMapMaker)\n", + "pipe2.build_stage(SourceSummarizer)\n", + "pipe2.build_stage(WLGCCov)\n", + "pipe2.build_stage(WLGCRandoms)\n", + "pipe2.build_stage(WLGCTwoPoint)\n", + "pipe2.build_stage(WLGCSummaryStatistic)\n" ] }, { "cell_type": "code", "execution_count": null, - "id": "e0411df5", + "id": "c65f96ae", "metadata": {}, "outputs": [], "source": [ @@ -170,7 +196,7 @@ { "cell_type": "code", "execution_count": null, - "id": "aa335c0e", + "id": "52595250", "metadata": {}, "outputs": [], "source": [ @@ -180,7 +206,7 @@ { "cell_type": "code", "execution_count": null, - "id": "111f0ad1", + "id": "b7258c1f", "metadata": {}, "outputs": [], "source": [ @@ -191,7 +217,7 @@ { "cell_type": "code", "execution_count": null, - "id": "366d9fce", + "id": "687115f5", "metadata": {}, "outputs": [], "source": [ @@ -201,7 +227,7 @@ { "cell_type": "code", "execution_count": null, - "id": "f80ee2f1", + "id": "a22aa4a3", "metadata": {}, "outputs": [], "source": [ @@ -211,7 +237,7 @@ { "cell_type": "code", "execution_count": null, - "id": "aa1ab5cd", + "id": "df60b9ee", "metadata": {}, "outputs": [], "source": []