Skip to content

Commit

Permalink
Merge branch 'eac-dev' of https://github.com/eacharles/ceci into eac-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
eacharles committed Dec 15, 2021
2 parents 1a80321 + 3040716 commit b109376
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
37 changes: 17 additions & 20 deletions ceci/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,44 +108,40 @@ def clear_stage_object(self):
self.stage_obj = None

def generate_full_command(self, inputs, outputs, config):
self.stage_class = PipelineStage.get_stage(self.name)
core = self.stage_class.generate_command(inputs, config, outputs)
if self.stage_obj is not None:
aliases = self.stage_obj.get_aliases()
else:
aliases = None
if self.stage_class is None:
self.build_stage_class()
core = self.stage_class.generate_command(inputs, config, outputs, aliases)
return self.site.command(core, self)


class FileManager:
class FileManager(dict):
"""
"""
def __init__(self):
self._tag_to_type = {}
self._tag_to_path = {}
self._path_to_tag = {}
dict.__init__(self)

def __getitem__(self, key):
return self._tag_to_path[key]

def __setitem__(self, key, value):
self._tag_to_path[key] = value
self._path_to_tag[value] = key

def __contains__(self, key):
return key in self._tag_to_path

def todict(self):
return self._tag_to_path

def __setitem__(self, tag, path):
dict.__setitem__(self, tag, path)
self._path_to_tag[path] = tag

def insert(self, tag, path=None, ftype=None):
if path is not None:
self._tag_to_path[tag] = path
self[tag] = path
self._path_to_tag[path] = tag
if ftype is not None:
self._tag_to_type[tag] = ftype

def get_type(self, tag):
return self._tag_to_type[tag]

def get_path(self, tag):
return self._tag_to_path[tag]
return self[tag]

def get_tag(self, path):
return self._path_to_tag
Expand Down Expand Up @@ -601,6 +597,7 @@ def initiate_run(self, overall_inputs):
return [] # list of futures

def enqueue_job(self, stage, pipeline_files):

from parsl.data_provider.files import File

#log_dir = self.run_config["log_dir"]
Expand Down
51 changes: 34 additions & 17 deletions ceci/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ def __init__(self, args, comm=None):
if comm is not None:
self.setup_mpi(comm)

def get_aliases(self):
return self.config.get('aliases', None)

def get_aliased_tag(self, tag):
aliases = self.config.get('aliases', None)
if aliases is None:
return tag
return aliases.get(tag, tag)

@classmethod
def build(cls, **kwargs):
kwcopy = kwargs.copy()
Expand Down Expand Up @@ -951,29 +960,29 @@ def read_config(self, args):

#return my_config

def find_inputs(self, pipeline_files): # FIXME, pipeline_files
d = {}
def find_inputs(self, pipeline_files):
ret_dict = {}
for tag, _ in self.inputs:
aliased_tag = self.get_aliased_tag(tag)
d[aliased_tag] = pipeline_files[aliased_tag]
return d
ret_dict[aliased_tag] = pipeline_files[aliased_tag]
return ret_dict

def find_outputs(self, outdir): # FIXME, pipeline_files
d = {}
def find_outputs(self, outdir):
ret_dict = {}
for tag, ftype in self.outputs:
aliased_tag = self.get_aliased_tag(tag)
d[aliased_tag] = f"{outdir}/{ftype.make_name(aliased_tag)}"
return d
ret_dict[aliased_tag] = f"{outdir}/{ftype.make_name(aliased_tag)}"
return ret_dict

def print_io(self, stream=sys.stdout):
stream.write("Inputs--------\n")
for tag, ftype in self.inputs:
aliased_tag = self.get_aliased_tag(tag)
stream.write(f"{tag:20} : {aliased_tag:20} : {str(ftype):20} : {self._inputs[tag]}\n")
aliased_tag - self.get_aliased_tag(tag)
stream.write(f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._inputs[tag]}\n")
stream.write("Outputs--------\n")
for tag, ftype in self.outputs:
aliased_tag = self.get_aliased_tag(tag)
stream.write(f"{tag:20} : {aliased_tag:20} : {str(ftype):20} : {self._outputs[tag]}\n")
aliased_tag - self.get_aliased_tag(tag)
stream.write(f"{tag:20} : {aliased_tag:20} :{str(ftype):20} : {self._outputs[tag]}\n")

def should_skip(self, run_config):
outputs = self.find_outputs(run_config["output_dir"]).values()
Expand Down Expand Up @@ -1081,7 +1090,7 @@ def iterate_hdf(
################################

@classmethod
def generate_command(cls, inputs, config, outputs): # FIXME PipelineFile
def generate_command(cls, inputs, config, outputs, aliases=None):
"""
Generate a command line that will run the stage
"""
Expand All @@ -1091,19 +1100,27 @@ def generate_command(cls, inputs, config, outputs): # FIXME PipelineFile
flags = [cls.name]

for tag, _ in cls.inputs:
if aliases is not None:
aliased_tag = aliases.get(tag, tag)
else:
aliased_tag = tag
try:
fpath = inputs[tag]
fpath = inputs[aliased_tag]
except KeyError as msg:
raise ValueError(f"Missing input location {tag}") from msg
raise ValueError(f"Missing input location {aliased_tag} {str(inputs)}") from msg
flags.append(f"--{tag}={fpath}")

flags.append(f"--config={config}")

for tag, _ in cls.outputs:
if aliases is not None:
aliased_tag = aliases.get(tag, tag)
else:
aliased_tag = tag
try:
fpath = outputs[tag]
fpath = outputs[aliased_tag]
except KeyError as msg:
raise ValueError(f"Missing output location {tag}") from msg
raise ValueError(f"Missing output location {aliased_tag} {str(outputs)}") from msg
flags.append(f"--{tag}={fpath}")

flags = " ".join(flags)
Expand Down

0 comments on commit b109376

Please sign in to comment.