diff --git a/maestrowf/abstracts/__init__.py b/maestrowf/abstracts/__init__.py index 5cacc25c6..0f6d0c861 100644 --- a/maestrowf/abstracts/__init__.py +++ b/maestrowf/abstracts/__init__.py @@ -33,23 +33,56 @@ This module contains all of the abstract classes and APIs for defining objects. Abstracts include abstract data stuctures (like a graph), APIs for concepts such as queueing adapters and environment APIs, as well as fundamental data -structures like a SimObject. +structures. """ # NOTE: Some of these abstracts will be moved in the future. The Graph abstract # class does not belong here, and should be moved to something more general. -# NOTE: The SimObject base class may not be required, since it basically -# just requires objects to be dictionaries. +import dill +import logging from maestrowf.abstracts.abstractclassmethod import abstractclassmethod from maestrowf.abstracts.envobject import Dependency, Source, Substitution from maestrowf.abstracts.graph import Graph -from maestrowf.abstracts.simobject import SimObject from maestrowf.abstracts.specification import Specification -__all__ = ("abstractclassmethod", "Dependency", "Graph", "SimObject", +__all__ = ("abstractclassmethod", "Dependency", "Graph", "PickleInterface", "Singleton", "Source", "Specification", "Substitution") +LOGGER = logging.getLogger(__name__) + + +class PickleInterface: + """A mixin class that implements a general pickle interface using dill.""" + + @classmethod + def unpickle(cls, path): + """ + Load a pickled instance from a pickle file. + + :param path: Path to a pickle file containing a class instance. + """ + with open(path, 'rb') as pkl: + obj = dill.load(pkl) + + if not isinstance(obj, cls): + msg = "Object loaded from {path} is of type {type}. Expected an" \ + " object of type '{cls}.'".format(path=path, type=type(obj), + cls=type(cls)) + LOGGER.error(msg) + raise TypeError(msg) + + return obj + + def pickle(self, path): + """ + Generate a pickle file of of a class instance. + + :param path: The path to write the pickle to. + """ + with open(path, 'wb') as pkl: + dill.dump(self, pkl) + class _Singleton(type): _instances = {} diff --git a/maestrowf/abstracts/envobject.py b/maestrowf/abstracts/envobject.py index d3d4f083a..32e1685c6 100644 --- a/maestrowf/abstracts/envobject.py +++ b/maestrowf/abstracts/envobject.py @@ -33,13 +33,11 @@ import logging import six -from maestrowf.abstracts.simobject import SimObject - -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) @six.add_metaclass(ABCMeta) -class EnvObject(SimObject): +class EnvObject: """ An abstract class representing objects that exist in a study's environment. @@ -64,7 +62,6 @@ def _verify(self): :returns: True if the EnvObject is verified, False otherwise. """ - pass def _verification(self, error): """ @@ -73,7 +70,7 @@ def _verification(self, error): :param error: String containing a custom error message. """ if not self._verify(): - logger.exception(error) + LOGGER.exception(error) raise ValueError(error) @@ -94,10 +91,8 @@ def substitute(self, data): :returns: A string equal to the original string data with substitutions made (if any were performed). """ - pass -@six.add_metaclass(ABCMeta) class Source(EnvObject): """ Abstract class representing classes that alter environment sourcing. @@ -126,7 +121,6 @@ def apply(self, data): # NOTE: This functionality has not been settled yet. The use of this # class or this design may not be the best for applying script sources # to an environment. - pass @six.add_metaclass(ABCMeta) @@ -158,4 +152,3 @@ def acquire(self, substitutions=None): :param substitutions: List of Substitution objects that can be applied. """ - pass diff --git a/maestrowf/abstracts/graph.py b/maestrowf/abstracts/graph.py index 656d644b5..c8e0dd83c 100644 --- a/maestrowf/abstracts/graph.py +++ b/maestrowf/abstracts/graph.py @@ -33,7 +33,7 @@ @six.add_metaclass(ABCMeta) -class Graph(object): +class Graph: """An abstract graph data structure.""" # NOTE: fdinatal -- 04/07/2017 @@ -50,7 +50,6 @@ def add_node(self, name, obj): :param name: String identifier of the node. :param obj: An object representing the value of the node. """ - pass @abstractmethod def add_edge(self, src, dest): @@ -60,7 +59,6 @@ def add_edge(self, src, dest): :param src: Source vertex name. :param dest: Destination vertex name. """ - pass @abstractmethod def remove_edge(self, src, dest): @@ -70,4 +68,3 @@ def remove_edge(self, src, dest): :param src: Source vertex name. :param dest: Destination vertex name. """ - pass diff --git a/maestrowf/abstracts/simobject.py b/maestrowf/abstracts/simobject.py deleted file mode 100644 index 652b4917e..000000000 --- a/maestrowf/abstracts/simobject.py +++ /dev/null @@ -1,64 +0,0 @@ -############################################################################### -# Copyright (c) 2017, Lawrence Livermore National Security, LLC. -# Produced at the Lawrence Livermore National Laboratory -# Written by Francesco Di Natale, dinatale3@llnl.gov. -# -# LLNL-CODE-734340 -# All rights reserved. -# This file is part of MaestroWF, Version: 1.0.0. -# -# For details, see https://github.com/LLNL/maestrowf. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -############################################################################### - -"""The core building block class for study related classes.""" - - -class SimObject(object): - """ - A base class for objects that provides a basic API. - - The SimObject is an object meant to capture the very basic functionality - that core classes and other closesly related classes should adhere to. - The eventual goal of having this base class is to allow a study to be - designed and written in Python code, which allows for those objects to - return a basic dictionary form which could be used to map from one study - specification, format, or otherwise to another. This basic functionality - also allows for a study to be easier to write using standard formats such - as YAML or JSON in order to keep record of how the studies were performed. - """ - - @classmethod - def from_dict(cls, dictionary): - """ - Method for populating a SimObject from a dictionary. - - :param cls: Class to be instantiated. - :param dict: Dictionary containing attribute data. - :return: Instance of cls. - """ - instance = cls() - for key, value in dictionary.items(): - instance.__dict__[key] = value - - return instance - - def to_dict(self): - """Return a dictionary version of the SimObject.""" - return self.__dict__ diff --git a/maestrowf/abstracts/specification.py b/maestrowf/abstracts/specification.py index a5b6ae43a..c6c9dbb98 100644 --- a/maestrowf/abstracts/specification.py +++ b/maestrowf/abstracts/specification.py @@ -30,11 +30,11 @@ from abc import ABCMeta, abstractmethod, abstractproperty import six -from maestrowf.abstracts import abstractclassmethod +from . import abstractclassmethod @six.add_metaclass(ABCMeta) -class Specification(object): +class Specification: """ Abstract class for loading and verifying a Study Specification """ @@ -48,7 +48,6 @@ def load_specification(cls, path): :returns: A specification object containing the information loaded from path. """ - pass @abstractclassmethod def load_specification_from_stream(cls, stream): @@ -58,14 +57,12 @@ def load_specification_from_stream(cls, stream): :param stream: Raw text stream containing specification data. :returns: A specification object containing the information in string. """ - pass @abstractmethod def verify(self): """ Verify the whole specification. """ - pass @abstractmethod def get_study_environment(self): @@ -74,7 +71,6 @@ def get_study_environment(self): :returns: A StudyEnvironment object with the data in the specification. """ - pass @abstractmethod def get_parameters(self): @@ -83,7 +79,6 @@ def get_parameters(self): :returns: A ParameterGenerator with data from the specification. """ - pass @abstractmethod def get_study_steps(self): @@ -92,7 +87,6 @@ def get_study_steps(self): :returns: A list of StudyStep objects. """ - pass @abstractproperty def output_path(self): @@ -101,7 +95,6 @@ def output_path(self): :returns: Returns OUTPUT_PATH if it exists, empty string otherwise. """ - pass @abstractproperty def name(self): @@ -110,7 +103,6 @@ def name(self): :returns: The name of the study described by the specification. """ - pass @name.setter def name(self, value): @@ -119,7 +111,6 @@ def name(self, value): :param value: String value representing the new name. """ - pass @abstractproperty def desc(self): @@ -129,7 +120,6 @@ def desc(self): :returns: A string containing the description of the study specification. """ - pass @desc.setter def desc(self, value): @@ -138,4 +128,3 @@ def desc(self, value): :param value: String value representing the new description. """ - pass diff --git a/maestrowf/conductor.py b/maestrowf/conductor.py index 26aebac93..658dd41e1 100644 --- a/maestrowf/conductor.py +++ b/maestrowf/conductor.py @@ -36,23 +36,74 @@ import os import sys from time import sleep +import dill +import yaml from maestrowf.abstracts.enums import StudyStatus -from maestrowf.datastructures.core import ExecutionGraph -from maestrowf.utils import create_parentdir +from maestrowf.datastructures.core import Study +from maestrowf.utils import create_parentdir, csvtable_to_dict, make_safe_path # Logger instantiation -rootlogger = logging.getLogger(inspect.getmodule(__name__)) -logger = logging.getLogger(__name__) +ROOTLOGGER = logging.getLogger(inspect.getmodule(__name__)) +LOGGER = logging.getLogger(__name__) # Formatting of logger. LFORMAT = "%(asctime)s - %(name)s:%(funcName)s:%(lineno)s - " \ "%(levelname)s - %(message)s" -def setup_argparser(): - """Set up the program's argument parser.""" - parser = ArgumentParser(prog="ExecutionManager", +def setup_logging(name, output_path, log_lvl=2, log_path=None, + log_stdout=False, log_format=None): + """ + Set up logging in the Main class. + :param args: A Namespace object created by a parsed ArgumentParser. + :param name: The name of the log file. + """ + # Check if the user has specified a custom log path. + if log_path: + LOGGER.info( + "Log path overwritten by command line -- %s", log_path) + else: + log_path = os.path.join(output_path, "logs") + + if not log_format: + log_format = LFORMAT + + loglevel = log_lvl * 10 + + # Attempt to create the logging directory. + create_parentdir(log_path) + formatter = logging.Formatter(LFORMAT) + ROOTLOGGER.setLevel(loglevel) + + # Set up handlers + if log_stdout: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + ROOTLOGGER.addHandler(handler) + + log_file = os.path.join(log_path, "{}.log".format(name)) + handler = logging.FileHandler(log_file) + handler.setFormatter(formatter) + ROOTLOGGER.addHandler(handler) + ROOTLOGGER.setLevel(loglevel) + + # Print the level of logging. + LOGGER.info("INFO Logging Level -- Enabled") + LOGGER.warning("WARNING Logging Level -- Enabled") + LOGGER.critical("CRITICAL Logging Level -- Enabled") + LOGGER.debug("DEBUG Logging Level -- Enabled") + + +def setup_parser(): + """ + Set up the Conductors's argument parser. + + :returns: A ArgumentParser that's initialized with the conductor's CLI. + """ + + # Set up the parser for our conductor here. + parser = ArgumentParser(prog="Conductor", description="An application for checking and " "managing an ExecutionDAG within an executing" "study.", @@ -75,137 +126,272 @@ def setup_argparser(): "2 - Info (Default)\n" "1 - Debug") parser.add_argument("-c", "--logstdout", action="store_true", - help="Output logging to stdout in addition to a file.") + help="Output logging to stdout in addition to a " + "file.") parser.add_argument("-t", "--sleeptime", type=int, default=60, - help="Amount of time (in seconds) for the manager to " - "wait between job status checks.") + help="Amount of time (in seconds) for the manager" + " to wait between job status checks.") return parser -def setup_logging(args, name): - """ - Set up logging in the Main class. +class Conductor: + """A class that provides an API for interacting with the Conductor.""" - :param args: A Namespace object created by a parsed ArgumentParser. - :param name: The name of the log file. - """ - # Check if the user has specified a custom log path. - if args.logpath: - logger.info("Log path overwritten by command line -- %s", - args.logpath) - log_path = args.logpath - else: - log_path = os.path.join(args.directory, "logs") + _pkl_extension = ".study.pkl" + _cancel_lock = ".cancel.lock" + _batch_info = "batch.info" - loglevel = args.debug_lvl * 10 + def __init__(self, study): + """ + Create a new instance of a Conductor class. - # Attempt to create the logging directory. - create_parentdir(log_path) - formatter = logging.Formatter(LFORMAT) - rootlogger.setLevel(loglevel) + :param study: An instance of a populated Maestro study. + """ + self._study = study + self._setup = False - # Set up handlers - if args.logstdout: - handler = logging.StreamHandler() - handler.setFormatter(formatter) - rootlogger.addHandler(handler) + @property + def output_path(self): + """ + Return the path representing the root of the study workspace. - log_file = os.path.join(log_path, "{}.log".format(name)) - handler = logging.FileHandler(log_file) - handler.setFormatter(formatter) - rootlogger.addHandler(handler) - rootlogger.setLevel(loglevel) + :returns: A string containing the path to the study's root. + """ + return self._study.output_path - # Print the level of logging. - logger.info("INFO Logging Level -- Enabled") - logger.warning("WARNING Logging Level -- Enabled") - logger.critical("CRITICAL Logging Level -- Enabled") - logger.debug("DEBUG Logging Level -- Enabled") - - -def monitor_study(dag, pickle_path, cancel_lock_path, sleep_time): - """Monitor a running study.""" - logger.debug("\n -------- Calling monitor study -------\n" - "pkl path = %s" - "cancel path = %s" - "sleep time = %s", - pickle_path, cancel_lock_path, sleep_time) - - completion_status = StudyStatus.RUNNING - while completion_status == StudyStatus.RUNNING: - if os.path.exists(cancel_lock_path): - # cancel the study if a cancel lock file is found - cancel_lock = FileLock(cancel_lock_path) + @property + def study_name(self): + """ + Return the name of the study this Conductor instance is managing. + + :returns: A string containing the name of the study. + """ + return self._study.name + + @classmethod + def store_study(cls, study): + """ + Store a Maestro study instance in a way the Conductor can read it. + """ + # Pickle up the Study + pkl_name = "{}{}".format(study.name, cls._pkl_extension) + pkl_path = make_safe_path(study.output_path, pkl_name) + study.pickle(pkl_path) + + @classmethod + def load_batch(cls, out_path): + """ + Load the batch information for the study rooted in 'out_path'. + + :param out_path: A string containing the path to a study root. + :returns: A dict containing the batch information for the study. + """ + batch_path = os.path.join(out_path, cls._batch_info) + + if not os.path.exists(batch_path): + msg = "Batch info files is missing. Please re-run Maestro." + LOGGER.error(msg) + raise Exception(msg) + + with open(batch_path, 'r') as data: try: - with cancel_lock.acquire(timeout=10): - # we have the lock - dag.cancel_study() - os.remove(cancel_lock_path) - logger.info("Study '%s' has been cancelled.", dag.name) - except Timeout: - logger.error("Failed to acquire cancellation lock.") - pass + batch_info = yaml.load(data, yaml.FullLoader) + except AttributeError: + LOGGER.warning( + "*** PyYAML is using an unsafe version with a known " + "load vulnerability. Please upgrade your installation " + "to a more recent version! ***") + batch_info = yaml.load(data) - logger.info("Checking DAG status at %s", str(datetime.now())) - # Execute steps that are ready - # Recieves StudyStatus enum - completion_status = dag.execute_ready_steps() - # Re-pickle the ExecutionGraph. - dag.pickle(pickle_path) - # Write out the state - dag.write_status(os.path.split(pickle_path)[0]) - # Sleep for SLEEPTIME in args if study not complete. - if completion_status == StudyStatus.RUNNING: - sleep(sleep_time) + return batch_info - return completion_status + @classmethod + def store_batch(cls, out_path, batch): + """ + Store the specified batch information to the study in 'out_path'. + :param out_path: A string containing the patht to a study root. + """ + path = os.path.join(out_path, cls._batch_info) + with open(path, "wb") as batch_info: + batch_info.write(yaml.dump(batch).encode("utf-8")) -def main(): - """Run the main segment of the conductor.""" - try: - # Set up and parse the ArgumentParser - parser = setup_argparser() - args = parser.parse_args() + @classmethod + def load_study(cls, out_path): + """ + Load the Study instance in the study root specified by 'out_path'. - # Unpickle the ExecutionGraph - study_pkl = glob.glob(os.path.join(args.directory, "*.pkl")) - # We expect only a single pickle file. - if len(study_pkl) == 1: - dag = ExecutionGraph.unpickle(study_pkl[0]) + :param out_path: A string containing the patht to a study root. + :returns: A string containing the path to the study's root. + """ + study_glob = \ + glob.glob(os.path.join(out_path, "*{}".format(cls._pkl_extension))) + + print(study_glob) + if len(study_glob) == 1: + # We only expect one result.If we only get one, let's assume and + # check after. + path = study_glob[0] + + with open(path, 'rb') as pkl: + obj = dill.load(pkl) + + if not isinstance(obj, Study): + msg = \ + "Object loaded from {path} is of type {type}. Expected " \ + "an object of type '{cls}.'" \ + .format(path=path, type=type(obj), cls=type(Study)) + LOGGER.error(msg) + raise TypeError(msg) else: - if len(study_pkl) > 1: + if len(study_glob) > 1: msg = "More than one pickle found. Expected one. Aborting." - status = 2 else: msg = "No pickle found. Aborting." - status = 1 - sys.stderr.write(msg) - sys.exit(status) + msg = "Corrupted study directory found. {}".format(msg) + raise Exception(msg) + + # Return the Study object + return obj + + @classmethod + def get_status(cls, output_path): + """ + Retrieve the status of the study rooted at 'out_path'. - # Set up logging - setup_logging(args, dag.name) - # Use ExecutionGraph API to determine next jobs to be launched. - logger.info("Checking the ExecutionGraph for study '%s' located in " - "%s...", dag.name, study_pkl[0]) - logger.info("Study Description: %s", dag.description) + :param out_path: A string containing the patht to a study root. + :returns: A dictionary containing the status of the study. + """ + stat_path = os.path.join(output_path, "status.csv") + lock_path = os.path.join(output_path, ".status.lock") + _ = {} + if os.path.exists(stat_path): + lock = FileLock(lock_path) + try: + with lock.acquire(timeout=10): + with open(stat_path, "r") as stat_file: + _ = csvtable_to_dict(stat_file) + except Timeout: + pass + + return _ + + @classmethod + def mark_cancelled(cls, output_path): + """ + Mark the study rooted at 'out_path'. + + :param out_path: A string containing the patht to a study root. + :returns: A dictionary containing the status of the study. + """ + lock_path = make_safe_path(output_path, cls._cancel_lock) + with open(lock_path, 'a'): + os.utime(lock_path, None) + + def initialize(self, batch_info, sleeptime=60): + """ + Initializes the Conductor instance based on the stored study. + + :param batch_info: A dict containing batch information. + :param sleeptime: The amount of sleep time between polling loops + [Default: 60s]. + """ + # Set our conductor's sleep time. + self.sleep_time = sleeptime + # Stage the study. + self._pkl_path, self._exec_dag = self._study.stage() + # Write metadata + self._exec_dag.set_adapter(batch_info) + self._study.store_metadata() + self._setup = True + + def monitor_study(self): + """Monitor a running study.""" + if not self._setup: + msg = \ + "Study '{}' located in '{}' not initialized. Initialize " \ + "study before calling launching. Aborting." \ + .format(self.study_name, self.output_path) + LOGGER.error(msg) + raise Exception(msg) + + # Set some fixed variables that monitor will use. + cancel_lock_path = make_safe_path(self.output_path, self._cancel_lock) + dag = self._exec_dag + pkl_path = \ + os.path.join(self._pkl_path, "{}.pkl".format(self._study.name)) + sleep_time = self.sleep_time + + LOGGER.debug( + "\n -------- Calling monitor study -------\n" + "pkl path = %s\n" + "cancel path = %s\n" + "sleep time = %s\n" + "------------------------------------------\n", + pkl_path, cancel_lock_path, sleep_time) - cancel_lock_path = os.path.join(args.directory, ".cancel.lock") - logger.info("Starting to monitor '%s'", dag.name) - completion_status = monitor_study(dag, study_pkl[0], - cancel_lock_path, args.sleeptime) + completion_status = StudyStatus.RUNNING + while completion_status == StudyStatus.RUNNING: + if os.path.exists(cancel_lock_path): + # cancel the study if a cancel lock file is found + cancel_lock = FileLock(cancel_lock_path) + try: + with cancel_lock.acquire(timeout=10): + # we have the lock + dag.cancel_study() + os.remove(cancel_lock_path) + LOGGER.info("Study '%s' has been cancelled.", dag.name) + except Timeout: + LOGGER.error("Failed to acquire cancellation lock.") + pass + + LOGGER.info("Checking DAG status at %s", str(datetime.now())) + # Execute steps that are ready + # Recieves StudyStatus enum + completion_status = dag.execute_ready_steps() + # Re-pickle the ExecutionGraph. + dag.pickle(pkl_path) + # Write out the state + dag.write_status(os.path.split(pkl_path)[0]) + # Sleep for SLEEPTIME in args if study not complete. + if completion_status == StudyStatus.RUNNING: + sleep(sleep_time) + + return completion_status + + def cleanup(self): + self._exec_dag.cleanup() + + +def main(): + """Run the main segment of the conductor.""" + conductor = None + + try: + # Parse the command line args and load the study. + parser = setup_parser() + args = parser.parse_args() + study = Conductor.load_study(args.directory) + setup_logging(study.name, args.directory, args.debug_lvl, + args.logpath, args.logstdout) + batch_info = Conductor.load_batch(args.directory) - logger.info("Cleaning up...") - dag.cleanup() - logger.info("Squeaky clean!") + conductor = Conductor(study) + conductor.initialize(batch_info, args.sleeptime) + completion_status = conductor.monitor_study() - # Explicitly return a 0 status. + LOGGER.info("Study completed with state '%s'.", completion_status) sys.exit(completion_status.value) except Exception as e: - logger.error(e.args, exc_info=True) + LOGGER.error(e.args, exc_info=True) raise e + finally: + if conductor: + LOGGER.info("Study exiting, cleaning up...") + conductor.cleanup() + LOGGER.info("Squeaky clean!") if __name__ == "__main__": diff --git a/maestrowf/datastructures/__init__.py b/maestrowf/datastructures/__init__.py index 2978b4abd..d9bdc1e24 100644 --- a/maestrowf/datastructures/__init__.py +++ b/maestrowf/datastructures/__init__.py @@ -40,9 +40,9 @@ import logging -from maestrowf.datastructures.dag import DAG -from maestrowf.datastructures.yamlspecification import YAMLSpecification +from .dag import DAG +from .yamlspecification import YAMLSpecification __all__ = ("DAG", "YAMLSpecification") -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) diff --git a/maestrowf/datastructures/core/executiongraph.py b/maestrowf/datastructures/core/executiongraph.py index 632f031f2..9e7ad4b7a 100644 --- a/maestrowf/datastructures/core/executiongraph.py +++ b/maestrowf/datastructures/core/executiongraph.py @@ -1,14 +1,14 @@ """Module for the execution of DAG workflows.""" from collections import deque, OrderedDict from datetime import datetime -import dill -from filelock import FileLock, Timeout import getpass import logging import os import shutil import tempfile +from filelock import FileLock, Timeout +from maestrowf.abstracts import PickleInterface from maestrowf.abstracts.enums import JobStatusCode, State, SubmissionCode, \ CancelCode, StudyStatus from maestrowf.datastructures.dag import DAG @@ -16,11 +16,11 @@ from maestrowf.interfaces import ScriptAdapterFactory from maestrowf.utils import create_parentdir, get_duration -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) SOURCE = "_source" -class _StepRecord(object): +class _StepRecord: """ A simple container object representing a workflow step record. @@ -83,10 +83,10 @@ def generate_script(self, adapter, tmp_dir=""): self.step.run["cmd"] = self.workspace.substitute(self.step.run["cmd"]) - logger.info("Generating script for %s into %s", self.name, scr_dir) + LOGGER.info("Generating script for %s into %s", self.name, scr_dir) self.to_be_scheduled, self.script, self.restart_script = \ adapter.write_script(scr_dir, self.step) - logger.info("Script: %s\nRestart: %s\nScheduled?: %s", + LOGGER.info("Script: %s\nRestart: %s\nScheduled?: %s", self.script, self.restart_script, self.to_be_scheduled) def execute(self, adapter): @@ -134,7 +134,7 @@ def _execute(self, adapter, script): def mark_submitted(self): """Mark the submission time of the record.""" - logger.debug( + LOGGER.debug( "Marking %s as submitted (PENDING) -- previously %s", self.name, self.status) @@ -142,14 +142,14 @@ def mark_submitted(self): if not self._submit_time: self._submit_time = datetime.now() else: - logger.warning( + LOGGER.warning( "Cannot set the submission time of '%s' because it has " "already been set.", self.name ) def mark_running(self): """Mark the start time of the record.""" - logger.debug( + LOGGER.debug( "Marking %s as running (RUNNING) -- previously %s", self.name, self.status) @@ -163,7 +163,7 @@ def mark_end(self, state): :param state: State enum corresponding to termination state. """ - logger.debug( + LOGGER.debug( "Marking %s as finished (%s) -- previously %s", self.name, state, @@ -174,7 +174,7 @@ def mark_end(self, state): def mark_restart(self): """Mark the end time of the record.""" - logger.debug( + LOGGER.debug( "Marking %s as restarting (TIMEOUT) -- previously %s", self.name, self.status) @@ -286,7 +286,7 @@ def restarts(self): return self._num_restarts -class ExecutionGraph(DAG): +class ExecutionGraph(DAG, PickleInterface): """ Datastructure that tracks, executes, and reports on study execution. @@ -343,7 +343,7 @@ def __init__(self, submission_attempts=1, submission_throttle=0, # tree or something of that nature to guarantee worst case performance. self._dependencies = {} - logger.info( + LOGGER.info( "\n------------------------------------------\n" "Submission attempts = %d\n" "Submission throttle limit = %d\n" @@ -357,14 +357,14 @@ def __init__(self, submission_attempts=1, submission_throttle=0, if self._submission_attempts < 1: _msg = "Submission attempts should always be greater than 0. " \ "Received a value of {}.".format(self._submission_attempts) - logger.error(_msg) + LOGGER.error(_msg) raise ValueError(_msg) if self._submission_throttle < 0: _msg = "Throttling should be 0 for unthrottled or a positive " \ "integer for the number of allowed inflight jobs. " \ "Received a value of {}.".format(self._submission_throttle) - logger.error(_msg) + LOGGER.error(_msg) raise ValueError(_msg) def _check_tmp_dir(self): @@ -416,14 +416,14 @@ def set_adapter(self, adapter): if not isinstance(adapter, dict): msg = "Adapter settings must be contained in a dictionary." - logger.error(msg) + LOGGER.error(msg) raise TypeError(msg) # Check to see that the adapter type is something the if adapter["type"] not in ScriptAdapterFactory.get_valid_adapters(): msg = "'{}' adapter must be specfied in ScriptAdapterFactory." \ .format(adapter) - logger.error(msg) + LOGGER.error(msg) raise TypeError(msg) self._adapter = adapter @@ -439,41 +439,6 @@ def add_description(self, name, description, **kwargs): self._description["description"] = description self._description.update(kwargs) - @classmethod - def unpickle(cls, path): - """ - Load an ExecutionGraph instance from a pickle file. - - :param path: Path to a ExecutionGraph pickle file. - """ - with open(path, 'rb') as pkl: - dag = dill.load(pkl) - - if not isinstance(dag, cls): - msg = "Object loaded from {path} is of type {type}. Expected an" \ - " object of type '{cls}.'".format(path=path, type=type(dag), - cls=type(cls)) - logger.error(msg) - raise TypeError(msg) - - return dag - - def pickle(self, path): - """ - Generate a pickle file of the graph instance. - - :param path: The path to write the pickle to. - """ - if not self._adapter: - msg = "A script adapter must be set before an ExecutionGraph is " \ - "pickled. Use the 'set_adapter' method to set a specific" \ - " script interface." - logger.error(msg) - raise Exception(msg) - - with open(path, 'wb') as pkl: - dill.dump(self, pkl) - @property def name(self): """ @@ -515,7 +480,7 @@ def log_description(self): desc = ["{}: {}".format(key, value) for key, value in self._description.items()] desc = "\n".join(desc) - logger.info( + LOGGER.info( "\n==================================================\n" "%s\n" "==================================================\n", @@ -535,11 +500,11 @@ def generate_scripts(self): if not self._adapter: msg = "Adapter not found. Specify a ScriptAdapter using " \ "set_adapter." - logger.error(msg) + LOGGER.error(msg) raise ValueError(msg) # Set up the adapter. - logger.info("Generating scripts...") + LOGGER.info("Generating scripts...") adapter = ScriptAdapterFactory.get_adapter(self._adapter["type"]) adapter = adapter(**self._adapter) @@ -562,7 +527,7 @@ def _execute_record(self, record, adapter, restart=False): :param restart: True if the record needs restarting, False otherwise. """ # Logging for debugging. - logger.info("Calling execute for StepRecord '%s'", record.name) + LOGGER.info("Calling execute for StepRecord '%s'", record.name) num_restarts = 0 # Times this step has temporally restarted. retcode = None # Execution return code. @@ -573,13 +538,13 @@ def _execute_record(self, record, adapter, restart=False): self._check_tmp_dir() while retcode != SubmissionCode.OK and \ num_restarts < self._submission_attempts: - logger.info("Attempting submission of '%s' (attempt %d of %d)...", + LOGGER.info("Attempting submission of '%s' (attempt %d of %d)...", record.name, num_restarts + 1, self._submission_attempts) # We're not restarting -- submit as usual. if not restart: - logger.debug("Calling 'execute' on '%s' at %s", + LOGGER.debug("Calling 'execute' on '%s' at %s", record.name, str(datetime.now())) # Generate the script for execution on the fly. record.setup_workspace() # Generate the workspace. @@ -588,21 +553,21 @@ def _execute_record(self, record, adapter, restart=False): # Otherwise, it's a restart. else: # If the restart is specified, use the record restart script. - logger.debug("Calling 'restart' on '%s' at %s", + LOGGER.debug("Calling 'restart' on '%s' at %s", record.name, str(datetime.now())) # Generate the script for execution on the fly. record.generate_script(adapter, self._tmp_dir) retcode = record.restart(adapter) # Increment the number of restarts we've attempted. - logger.debug("Completed submission attempt %d", num_restarts) + LOGGER.debug("Completed submission attempt %d", num_restarts) num_restarts += 1 if retcode == SubmissionCode.OK: self.in_progress.add(record.name) if record.is_local_step: - logger.info("Local step %s executed with status OK. Complete.", + LOGGER.info("Local step %s executed with status OK. Complete.", record.name) record.mark_end(State.FINISHED) self.completed_steps.add(record.name) @@ -610,7 +575,7 @@ def _execute_record(self, record, adapter, restart=False): else: # Find the subtree, because anything dependent on this step now # failed. - logger.warning("'%s' failed to submit properly. " + LOGGER.warning("'%s' failed to submit properly. " "Step failed.", record.name) path, parent = self.bfs_subtree(record.name) for node in path: @@ -618,7 +583,7 @@ def _execute_record(self, record, adapter, restart=False): self.values[node].mark_end(State.FAILED) # After execution state debug logging. - logger.debug("After execution of '%s' -- New state is %s.", + LOGGER.debug("After execution of '%s' -- New state is %s.", record.name, record.status) def write_status(self, path): @@ -651,7 +616,7 @@ def write_status(self, path): def _check_study_completion(self): # We cancelled, return True marking study as complete. if self.is_canceled: - logger.info("Cancelled -- completing study.") + LOGGER.info("Cancelled -- completing study.") return StudyStatus.CANCELLED # check for completion of all steps @@ -695,13 +660,13 @@ def execute_ready_steps(self): adapter = adapter(**self._adapter) retcode, job_status = self.check_study_status() - logger.debug("Checked status (retcode %s)-- %s", retcode, job_status) + LOGGER.debug("Checked status (retcode %s)-- %s", retcode, job_status) # For now, if we can't check the status something is wrong. # Don't modify the DAG. if retcode == JobStatusCode.ERROR: msg = "Job status check failed -- Aborting." - logger.error(msg) + LOGGER.error(msg) raise RuntimeError(msg) elif retcode == JobStatusCode.OK: # For the status of each currently in progress job, check its @@ -709,20 +674,20 @@ def execute_ready_steps(self): cleanup_steps = set() # Steps that are in progress showing failed. for name, status in job_status.items(): - logger.debug("Checking job '%s' with status %s.", name, status) + LOGGER.debug("Checking job '%s' with status %s.", name, status) record = self.values[name] if status == State.FINISHED: # Mark the step complete and notate its end time. record.mark_end(State.FINISHED) - logger.info("Step '%s' marked as finished. Adding to " + LOGGER.info("Step '%s' marked as finished. Adding to " "complete set.", name) self.completed_steps.add(name) self.in_progress.remove(name) elif status == State.RUNNING: # When detect that a step is running, mark it. - logger.info("Step '%s' found to be running.") + LOGGER.info("Step '%s' found to be running.") record.mark_running() elif status == State.TIMEDOUT: @@ -731,13 +696,13 @@ def execute_ready_steps(self): # If we're under the restart limit, attempt a restart. if record.can_restart: if record.mark_restart(): - logger.info( + LOGGER.info( "Step '%s' timed out. Restarting (%s of %s).", name, record.restarts, record.restart_limit ) self._execute_record(record, adapter, restart=True) else: - logger.info("'%s' has been restarted %s of %s " + LOGGER.info("'%s' has been restarted %s of %s " "times. Marking step and all " "descendents as failed.", name, @@ -747,7 +712,7 @@ def execute_ready_steps(self): cleanup_steps.update(self.bfs_subtree(name)[0]) # Otherwise, we can't restart so mark the step timed out. else: - logger.info("'%s' timed out, but cannot be restarted." + LOGGER.info("'%s' timed out, but cannot be restarted." " Marked as TIMEDOUT.", name) # Mark that the step ended due to TIMEOUT. record.mark_end(State.TIMEDOUT) @@ -765,14 +730,14 @@ def execute_ready_steps(self): # TODO: Need to make sure that we do this a finite number # of times. # Resubmit the cmd. - logger.warning("Hardware failure detected. Attempting to " + LOGGER.warning("Hardware failure detected. Attempting to " "resubmit step '%s'.", name) # We can just let the logic below handle submission with # everything else. self.ready_steps.append(name) elif status == State.FAILED: - logger.warning( + LOGGER.warning( "Job failure reported. Aborting %s -- flagging all " "dependent jobs as failed.", name @@ -782,7 +747,7 @@ def execute_ready_steps(self): cleanup_steps.update(self.bfs_subtree(name)[0]) elif status == State.CANCELLED: - logger.info("Step '%s' was cancelled.", name) + LOGGER.info("Step '%s' was cancelled.", name) self.in_progress.remove(name) record.mark_end(State.CANCELLED) @@ -802,17 +767,17 @@ def execute_ready_steps(self): # A completed step by definition has had its dependencies met. # Skip it. if key in self.completed_steps: - logger.debug("'%s' in completed set, skipping.", key) + LOGGER.debug("'%s' in completed set, skipping.", key) continue - logger.debug("Checking %s -- %s", key, record.jobid) + LOGGER.debug("Checking %s -- %s", key, record.jobid) # If the record is only INITIALIZED, we have encountered a step # that needs consideration. if record.status == State.INITIALIZED: - logger.debug("'%s' found to be initialized. Checking " + LOGGER.debug("'%s' found to be initialized. Checking " "dependencies. ", key) - logger.debug( + LOGGER.debug( "Unfulfilled dependencies: %s", self._dependencies[key]) @@ -821,7 +786,7 @@ def execute_ready_steps(self): self._dependencies[key]) self._dependencies[key] = \ self._dependencies[key] - set(s_completed) - logger.debug( + LOGGER.debug( "Completed dependencies: %s\n" "Remaining dependencies: %s", s_completed, self._dependencies[key]) @@ -829,16 +794,16 @@ def execute_ready_steps(self): # If the gating dependencies set is empty, we can execute. if not self._dependencies[key]: if key not in self.ready_steps: - logger.debug("All dependencies completed. Staging.") + LOGGER.debug("All dependencies completed. Staging.") self.ready_steps.append(key) else: - logger.debug("Already staged. Passing.") + LOGGER.debug("Already staged. Passing.") continue # We now have a collection of ready steps. Execute. # If we don't have a submission limit, go ahead and submit all. if self._submission_throttle == 0: - logger.info("Launching all ready steps...") + LOGGER.info("Launching all ready steps...") _available = len(self.ready_steps) # Else, we have a limit -- adhere to it. else: @@ -851,7 +816,7 @@ def execute_ready_steps(self): # computed number of slots. We could have free slots, but have less # in the queue. _available = min(_available, len(self.ready_steps)) - logger.info("Found %d available slots...", _available) + LOGGER.info("Found %d available slots...", _available) for i in range(0, _available): # Pop the record and execute using the helper method. @@ -859,12 +824,12 @@ def execute_ready_steps(self): # If we get to this point and we've cancelled, cancel the record. if self.is_canceled: - logger.info("Cancelling '%s' -- continuing.", _record.name) + LOGGER.info("Cancelling '%s' -- continuing.", _record.name) _record.mark_end(State.CANCELLED) self.cancelled_steps.add(_record.name) continue - logger.debug("Launching job %d -- %s", i, _record.name) + LOGGER.debug("Launching job %d -- %s", i, _record.name) self._execute_record(_record, adapter) # check the status of the study upon finishing this round of execution @@ -898,14 +863,14 @@ def check_study_status(self): # Based on return code, log something different. if retcode == JobStatusCode.OK: - logger.info("Jobs found for user '%s'.", getpass.getuser()) + LOGGER.info("Jobs found for user '%s'.", getpass.getuser()) return retcode, step_status elif retcode == JobStatusCode.NOJOBS: - logger.info("No jobs found.") + LOGGER.info("No jobs found.") return retcode, step_status else: msg = "Unknown Error (Code = {})".format(retcode) - logger.error(msg) + LOGGER.error(msg) return retcode, step_status def cancel_study(self): @@ -924,12 +889,12 @@ def cancel_study(self): self.is_canceled = True if crecord.cancel_status == CancelCode.OK: - logger.info("Successfully requested to cancel all jobs.") + LOGGER.info("Successfully requested to cancel all jobs.") elif crecord.cancel_status == CancelCode.ERROR: - logger.error( + LOGGER.error( "Failed to cancel jobs. (Code = %s)", crecord.return_code) else: - logger.error("Unknown Error (Code = %s)", crecord.return_code) + LOGGER.error("Unknown Error (Code = %s)", crecord.return_code) return crecord.cancel_status diff --git a/maestrowf/datastructures/core/parameters.py b/maestrowf/datastructures/core/parameters.py index b566cb656..b0be5c85e 100644 --- a/maestrowf/datastructures/core/parameters.py +++ b/maestrowf/datastructures/core/parameters.py @@ -39,8 +39,6 @@ import logging import re -from maestrowf.abstracts import SimObject - logger = logging.getLogger(__name__) @@ -161,7 +159,7 @@ def apply(self, item): return item -class ParameterGenerator(SimObject): +class ParameterGenerator: """ Class for containing parameters and generating combinations. diff --git a/maestrowf/datastructures/core/study.py b/maestrowf/datastructures/core/study.py index c9602ec26..cfe98c3b6 100644 --- a/maestrowf/datastructures/core/study.py +++ b/maestrowf/datastructures/core/study.py @@ -37,12 +37,12 @@ from types import MethodType import yaml -from maestrowf.abstracts import SimObject -from maestrowf.datastructures.core import ExecutionGraph +from maestrowf.abstracts import PickleInterface from maestrowf.datastructures.dag import DAG from maestrowf.utils import apply_function, create_parentdir, make_safe_path +from .executiongraph import ExecutionGraph -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) SOURCE = "_source" WSREGEX = re.compile( r"\$\(([-!\$%\^&\*\(\)_\+\|~=`{}\[\]:;<>\?,\.\/\w]+)\.workspace\)" @@ -52,7 +52,7 @@ ) -class StudyStep(SimObject): +class StudyStep: """ Class that represents the data and API for a single study step. @@ -122,7 +122,7 @@ def __ne__(self, other): return not self.__eq__(other) -class Study(DAG): +class Study(DAG, PickleInterface): """ Collection of high level objects to perform study construction. @@ -196,7 +196,7 @@ def __init__(self, name, description, self._out_path = out_path self._meta_path = os.path.join(out_path, "meta") - logger.info("OUTPUT_PATH = %s", out_path) + LOGGER.info("OUTPUT_PATH = %s", out_path) # Flag the study as not having been set up and add the source node. self._issetup = False self.add_node(SOURCE, None) @@ -309,7 +309,7 @@ def load_metadata(self): msg = "Object loaded from {path} is of type {type}. Expected an" \ " object of type '{cls}.'".format(path=path, type=type(env), cls=type(self)) - logger.error(msg) + LOGGER.error(msg) raise TypeError(msg) metapath = os.path.join(self._meta_path, "metadata.yaml") @@ -336,7 +336,7 @@ def add_step(self, step): """ # Add the node to the DAG. self.add_node(step.name, step) - logger.info( + LOGGER.info( "Adding step '%s' to study '%s'...", step.name, self.name) # Apply the environment to the incoming step. step.__dict__ = \ @@ -345,7 +345,7 @@ def add_step(self, step): # If the step depends on a prior step, create an edge. if "depends" in step.run and step.run["depends"]: for dependency in step.run["depends"]: - logger.info("{0} is dependent on {1}. Creating edge (" + LOGGER.info("{0} is dependent on {1}. Creating edge (" "{1}, {0})...".format(step.name, dependency)) if "*" not in dependency: self.add_edge(dependency, step.name) @@ -376,17 +376,17 @@ def walk_study(self, src=SOURCE): def setup_workspace(self): """Set up the study's main workspace directory.""" try: - logger.info("Setting up study workspace in '%s'", self._out_path) + LOGGER.info("Setting up study workspace in '%s'", self._out_path) create_parentdir(self._out_path) except Exception as e: - logger.error(e.args) + LOGGER.error(e.args) return False def setup_environment(self): """Set up the environment by acquiring outside dependencies.""" # Set up the environment if it hasn't been already. if not self.environment.is_set_up: - logger.info("Environment is setting up.") + LOGGER.info("Environment is setting up.") self.environment.acquire_environment() def configure_study(self, submission_attempts=1, restart_limit=1, @@ -414,7 +414,7 @@ def configure_study(self, submission_attempts=1, restart_limit=1, self._use_tmp = use_tmp self._hash_ws = hash_ws - logger.info( + LOGGER.info( "\n------------------------------------------\n" "Output path = %s\n" "Submission attempts = %d\n" @@ -437,7 +437,7 @@ def _stage(self, dag): steps. """ # Items to store that should be reset. - logger.info( + LOGGER.info( "\n==================================================\n" "Constructing parameter study '%s'\n" "==================================================\n", @@ -463,7 +463,7 @@ def _stage(self, dag): # used parameters of the step, and then adding all parameterized # combinations of funneled steps. for step in t_sorted: - logger.info( + LOGGER.info( "\n==================================================\n" "Processing step '%s'\n" "==================================================\n", @@ -471,7 +471,7 @@ def _stage(self, dag): ) # If we encounter SOURCE, just add it and continue. if step == SOURCE: - logger.info("Encountered '%s'. Adding and continuing.", SOURCE) + LOGGER.info("Encountered '%s'. Adding and continuing.", SOURCE) dag.add_node(SOURCE, None) continue @@ -485,15 +485,15 @@ def _stage(self, dag): s_params = self.parameters.get_used_parameters(node) p_params = set() # Used parameters excluding the current step. # Iterate through dependencies to update the p_params - logger.debug("\n*** Processing dependencies ***") + LOGGER.debug("\n*** Processing dependencies ***") for parent in node.run["depends"]: # If we have a dependency that is parameter independent, add # it to the hub dependency set. if "*" in parent: - logger.debug("Found funnel dependency -- %s", parent) + LOGGER.debug("Found funnel dependency -- %s", parent) self.hub_depends[step].add(re.sub(ALL_COMBOS, "", parent)) else: - logger.debug("Found dependency -- %s", parent) + LOGGER.debug("Found dependency -- %s", parent) # Otherwise, just note the parameters used by the step. self.depends[step].add(parent) p_params |= self.used_params[parent] @@ -507,19 +507,19 @@ def _stage(self, dag): if ws not in self.used_params: msg = "Workspace for '{}' is being used before it would" \ " be generated.".format(ws) - logger.error(msg) + LOGGER.error(msg) raise Exception(msg) # We have the case that if we're using a workspace of a step # that is a parameter independent dependency, we can skip it. # The parameters don't affect the combinations. if ws in self.hub_depends[step]: - logger.info( + LOGGER.info( "'%s' parameter independent association found. " "Skipping.", ws) continue - logger.debug( + LOGGER.debug( "Found workspace '%s' using parameters %s", ws, self.used_params[ws]) p_params |= self.used_params[ws] @@ -536,7 +536,7 @@ def _stage(self, dag): # 1. The step and all its preceding parents use no parameters. if not self.used_params[step]: - logger.info( + LOGGER.info( "\n-------------------------------------------------\n" "Adding step '%s' (No parameters used)\n" "-------------------------------------------------\n", @@ -548,23 +548,23 @@ def _stage(self, dag): workspace = make_safe_path(self._out_path, *[step]) self.workspaces[step] = workspace - logger.debug("Workspace: %s", workspace) + LOGGER.debug("Workspace: %s", workspace) # NOTE: I don't think it's valid to have a specific workspace # since a step with no parameters operates at the global level. # NOTE: Opting to save the old command for provenence reasons. cmd = node.run["cmd"] r_cmd = node.run["restart"] - logger.info("Searching for workspaces...\ncmd = %s", cmd) + LOGGER.info("Searching for workspaces...\ncmd = %s", cmd) for match in used_spaces: - logger.info("Workspace found -- %s", match) + LOGGER.info("Workspace found -- %s", match) workspace_var = "$({}.workspace)".format(match) if match in self.hub_depends[step]: # If we're looking at a parameter independent match # the workspace is the folder that contains all of # the outputs of all combinations for the step. ws = make_safe_path(self._out_path, *[match]) - logger.info("Found funnel workspace -- %s", ws) + LOGGER.info("Found funnel workspace -- %s", ws) else: ws = self.workspaces[match] cmd = cmd.replace(workspace_var, ws) @@ -574,35 +574,35 @@ def _stage(self, dag): node = copy.deepcopy(node) node.run["cmd"] = cmd node.run["restart"] = r_cmd - logger.debug("New cmd = %s", cmd) - logger.debug("New restart = %s", r_cmd) + LOGGER.debug("New cmd = %s", cmd) + LOGGER.debug("New restart = %s", r_cmd) dag.add_step(step, node, workspace, rlimit) if self.depends[step] or self.hub_depends[step]: # So, because we don't have used parameters, we can just # loop over the dependencies and add them. - logger.debug("Processing regular dependencies.") + LOGGER.debug("Processing regular dependencies.") for parent in self.depends[step]: - logger.info("Adding edge (%s, %s)...", parent, step) + LOGGER.info("Adding edge (%s, %s)...", parent, step) dag.add_connection(parent, step) # We can still have a case where we have steps that do # funnel into this one even though this particular step # is not parameterized. - logger.debug("Processing hub dependencies.") + LOGGER.debug("Processing hub dependencies.") for parent in self.hub_depends[step]: for item in self.step_combos[parent]: - logger.info("Adding edge (%s, %s)...", item, step) + LOGGER.info("Adding edge (%s, %s)...", item, step) dag.add_connection(item, step) else: # Otherwise, just add source since we're not dependent. - logger.debug("Adding edge (%s, %s)...", SOURCE, step) + LOGGER.debug("Adding edge (%s, %s)...", SOURCE, step) dag.add_connection(SOURCE, step) # 2. The step has used parameters. else: - logger.info( + LOGGER.info( "\n==================================================\n" "Expanding step '%s'\n" "==================================================\n" @@ -613,7 +613,7 @@ def _stage(self, dag): ) # Now we iterate over the combinations and expand the step. for combo in self.parameters: - logger.info("\n**********************************\n" + LOGGER.info("\n**********************************\n" "Combo [%s]\n" "**********************************", str(combo)) @@ -626,7 +626,7 @@ def _stage(self, dag): else: workspace = \ make_safe_path(self._out_path, *[step, combo_str]) - logger.debug("Workspace: %s", workspace) + LOGGER.debug("Workspace: %s", workspace) combo_str = "{}_{}".format(step, combo_str) self.workspaces[combo_str] = workspace @@ -642,23 +642,23 @@ def _stage(self, dag): # Substitute workspaces into the combination. cmd = step_exp.run["cmd"] r_cmd = step_exp.run["restart"] - logger.info("Searching for workspaces...\ncmd = %s", cmd) + LOGGER.info("Searching for workspaces...\ncmd = %s", cmd) for match in used_spaces: # Construct the workspace variable. - logger.info("Workspace found -- %s", ws) + LOGGER.info("Workspace found -- %s", ws) workspace_var = "$({}.workspace)".format(match) if match in self.hub_depends[step]: # If we're looking at a parameter independent match # the workspace is the folder that contains all of # the outputs of all combinations for the step. ws = make_safe_path(self._out_path, *[match]) - logger.info("Found funnel workspace -- %s", ws) + LOGGER.info("Found funnel workspace -- %s", ws) elif not self.used_params[match]: # If it's not a funneled dependency and the match # is not parameterized, then the workspace is just # the unparameterized match. ws = self.workspaces[match] - logger.info( + LOGGER.info( "Found unparameterized workspace -- %s", match) else: # Otherwise, we're dealing with a combination. @@ -666,14 +666,14 @@ def _stage(self, dag): match, combo.get_param_string(self.used_params[match]) ) - logger.info( + LOGGER.info( "Found parameterized workspace -- %s", ws) ws = self.workspaces[ws] # Replace in both the command and restart command. cmd = cmd.replace(workspace_var, ws) r_cmd = r_cmd.replace(workspace_var, ws) - logger.info("New cmd = %s", cmd) + LOGGER.info("New cmd = %s", cmd) step_exp.run["cmd"] = cmd step_exp.run["restart"] = r_cmd @@ -683,14 +683,14 @@ def _stage(self, dag): if self.depends[step] or self.hub_depends[step]: # So, because we don't have used parameters, we can # just loop over the dependencies and add them. - logger.info("Processing regular dependencies.") + LOGGER.info("Processing regular dependencies.") for p in self.depends[step]: if self.used_params[p]: p = "{}_{}".format( p, combo.get_param_string(self.used_params[p]) ) - logger.info( + LOGGER.info( "Adding edge (%s, %s)...", p, combo_str ) dag.add_connection(p, combo_str) @@ -698,16 +698,16 @@ def _stage(self, dag): # We can still have a case where we have steps that do # funnel into this one even though this particular step # is not parameterized. - logger.debug("Processing hub dependencies.") + LOGGER.debug("Processing hub dependencies.") for parent in self.hub_depends[step]: for item in self.step_combos[parent]: - logger.info( + LOGGER.info( "Adding edge (%s, %s)...", item, combo_str ) dag.add_connection(item, combo_str) else: # Otherwise, just add source since we're not dependent. - logger.debug( + LOGGER.debug( "Adding edge (%s, %s)...", SOURCE, combo_str ) dag.add_connection(SOURCE, combo_str) @@ -728,7 +728,7 @@ def _stage_linear(self, dag): for step in t_sorted: # If we find the source node, we can just add it and continue. if step == SOURCE: - logger.debug("Source node found.") + LOGGER.debug("Source node found.") dag.add_node(SOURCE, None) continue @@ -751,12 +751,12 @@ def _stage_linear(self, dag): cmd = node.run["cmd"] r_cmd = node.run["restart"] - logger.info("Searching for workspaces...\ncmd = %s", cmd) + LOGGER.info("Searching for workspaces...\ncmd = %s", cmd) used_spaces = re.findall(WSREGEX, cmd) for match in used_spaces: # In this case we don't need to look for any parameters, or # combination depdendent ("funnel") steps. It's a simple sub. - logger.info("Workspace found -- %s", match) + LOGGER.info("Workspace found -- %s", match) workspace_var = "$({}.workspace)".format(match) ws = self.workspaces[match] cmd = cmd.replace(workspace_var, ws) @@ -795,14 +795,14 @@ def stage(self): if not os.path.exists(self._out_path): msg = "Study {} is not set up for staging. Workspace does not " \ "exists (Output Dir = {}).".format(self.name, self._out_path) - logger.error(msg) + LOGGER.error(msg) raise Exception(msg) # If the environment isn't set up, raise an exception. if not self.environment.is_set_up: msg = "Study {} is not set up for staging. Environment is not " \ "set up. Aborting.".format(self.name) - logger.error(msg) + LOGGER.error(msg) raise Exception(msg) # After substituting, we should start getting combinations and diff --git a/maestrowf/datastructures/core/studyenvironment.py b/maestrowf/datastructures/core/studyenvironment.py index 4d1de699a..194b35963 100644 --- a/maestrowf/datastructures/core/studyenvironment.py +++ b/maestrowf/datastructures/core/studyenvironment.py @@ -31,12 +31,12 @@ import logging -from maestrowf.abstracts import SimObject, Dependency, Source, Substitution +from maestrowf.abstracts import Dependency, Source, Substitution -logger = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) -class StudyEnvironment(SimObject): +class StudyEnvironment: """ StudyEnvironment for managing a study environment. @@ -58,7 +58,7 @@ def __init__(self): # Boolean that tracks if dependencies have been acquired. self._is_set_up = False - logger.debug("Initialized an empty StudyEnvironment.") + LOGGER.debug("Initialized an empty StudyEnvironment.") def __bool__(self): """ @@ -88,44 +88,44 @@ def add(self, item): # because the necessary variable could have not been added yet # and there's too much of a need to process a dependency first. name = None - logger.debug("Calling add with %s", str(item)) + LOGGER.debug("Calling add with %s", str(item)) if isinstance(item, Dependency): - logger.debug("Adding %s of type %s.", item.name, type(item)) - logger.debug("Value: %s.", item.__dict__) + LOGGER.debug("Adding %s of type %s.", item.name, type(item)) + LOGGER.debug("Value: %s.", item.__dict__) self.dependencies[item.name] = item name = item.name self._is_set_up = False elif isinstance(item, Substitution): - logger.debug("Value: %s", item.value) - logger.debug("Tokens: %s", self._tokens) + LOGGER.debug("Value: %s", item.value) + LOGGER.debug("Tokens: %s", self._tokens) name = item.name - logger.debug("Adding %s of type %s.", item.name, type(item)) + LOGGER.debug("Adding %s of type %s.", item.name, type(item)) if ( isinstance(item.value, str) and any(token in item.value for token in self._tokens)): - logger.debug("Label detected. Adding %s to labels", item.name) + LOGGER.debug("Label detected. Adding %s to labels", item.name) self.labels[item.name] = item else: self._tokens.add(item.token) self.substitutions[item.name] = item elif isinstance(item, Source): - logger.debug("Adding source %s", item.source) - logger.debug("Item source: %s", item.source) + LOGGER.debug("Adding source %s", item.source) + LOGGER.debug("Item source: %s", item.source) self.sources.append(item) else: error = "Received an item of type {}. Expected an item of base " \ "type Substitution, Source, or Dependency." \ .format(type(item)) - logger.exception(error) + LOGGER.exception(error) raise TypeError(error) if name and name in self._names: error = "A duplicate name '{}' has been detected. All names " \ "must be unique. Aborting.".format(name) - logger.exception(error) + LOGGER.exception(error) raise ValueError(error) else: - logger.debug("{} added to set of names.".format(name)) + LOGGER.debug("{} added to set of names.".format(name)) self._names.add(name) def find(self, key): @@ -136,20 +136,20 @@ def find(self, key): :returns: The environment object labeled by key, None if key is not found. """ - logger.debug("Looking for '%s'...", key) + LOGGER.debug("Looking for '%s'...", key) if key in self.dependencies: - logger.debug("Found '%s' in environment dependencies.", key) + LOGGER.debug("Found '%s' in environment dependencies.", key) return self.dependencies[key] if key in self.substitutions: - logger.debug("Found '%s' in environment substitutions.", key) + LOGGER.debug("Found '%s' in environment substitutions.", key) return self.substitutions[key] if key in self.labels: - logger.debug("Found '%s' in environment labels.", key) + LOGGER.debug("Found '%s' in environment labels.", key) return self.labels[key] - logger.debug("'%s' not found -- \n%s", key, self) + LOGGER.debug("'%s' not found -- \n%s", key, self) return None def remove(self, key): @@ -159,7 +159,7 @@ def remove(self, key): :param key: Name of the environment object to remove. :returns: The environment object labeled by key. """ - logger.debug("Looking to remove '%s'...", key) + LOGGER.debug("Looking to remove '%s'...", key) if key not in self._names: return None @@ -179,18 +179,18 @@ def remove(self, key): self._names.remove(key) return _ - logger.debug("'%s' not found -- \n%s", key, self) + LOGGER.debug("'%s' not found -- \n%s", key, self) return None def acquire_environment(self): """Acquire any environment items that may be stored remotely.""" if self._is_set_up: - logger.info("Environment already set up. Returning.") + LOGGER.info("Environment already set up. Returning.") return - logger.info("Acquiring dependencies") + LOGGER.info("Acquiring dependencies") for dependency, value in self.dependencies.items(): - logger.info("Acquiring -- %s", dependency) + LOGGER.info("Acquiring -- %s", dependency) value.acquire(substitutions=self.substitutions.values()) self._is_set_up = True @@ -205,24 +205,24 @@ def apply_environment(self, item): if not item: return item - logger.debug("Applying environment to %s", item) - logger.debug("Processing labels...") + LOGGER.debug("Applying environment to %s", item) + LOGGER.debug("Processing labels...") for label, value in self.labels.items(): - logger.debug("Looking for %s in %s", label, item) + LOGGER.debug("Looking for %s in %s", label, item) item = value.substitute(item) - logger.debug("After substitution: %s", item) + LOGGER.debug("After substitution: %s", item) - logger.debug("Processing dependencies...") + LOGGER.debug("Processing dependencies...") for label, dependency in self.dependencies.items(): - logger.debug("Looking for %s in %s", label, item) + LOGGER.debug("Looking for %s in %s", label, item) item = dependency.substitute(item) - logger.debug("After substitution: %s", item) - logger.debug("Acquiring %s.", label) + LOGGER.debug("After substitution: %s", item) + LOGGER.debug("Acquiring %s.", label) - logger.debug("Processing substitutions...") + LOGGER.debug("Processing substitutions...") for substitution, value in self.substitutions.items(): - logger.debug("Looking for %s in %s", substitution, item) + LOGGER.debug("Looking for %s in %s", substitution, item) item = value.substitute(item) - logger.debug("After substitution: %s", item) + LOGGER.debug("After substitution: %s", item) return item diff --git a/maestrowf/datastructures/yamlspecification.py b/maestrowf/datastructures/yamlspecification.py index c66d2e6c3..2528d0ff1 100644 --- a/maestrowf/datastructures/yamlspecification.py +++ b/maestrowf/datastructures/yamlspecification.py @@ -99,7 +99,7 @@ def load_specification(cls, path): except Exception as e: logger.exception(e.args) - raise + raise e # Populate the path to the specification that populated this instance. specification.path = path diff --git a/maestrowf/maestro.py b/maestrowf/maestro.py index 0e5878d2d..901ef450c 100644 --- a/maestrowf/maestro.py +++ b/maestrowf/maestro.py @@ -29,7 +29,6 @@ """A script for launching a YAML study specification.""" from argparse import ArgumentParser, ArgumentError, RawTextHelpFormatter -from filelock import FileLock, Timeout import inspect import logging import os @@ -40,12 +39,12 @@ import time from maestrowf import __version__ -from maestrowf.conductor import monitor_study +from maestrowf.conductor import Conductor from maestrowf.datastructures import YAMLSpecification from maestrowf.datastructures.core import Study from maestrowf.datastructures.environment import Variable from maestrowf.utils import \ - create_parentdir, create_dictionary, csvtable_to_dict, make_safe_path, \ + create_parentdir, create_dictionary, make_safe_path, \ start_process @@ -61,31 +60,25 @@ def status_study(args): """Check and print the status of an executing study.""" - study_path = args.directory - stat_path = os.path.join(study_path, "status.csv") - lock_path = os.path.join(study_path, ".status.lock") - if os.path.exists(stat_path): - lock = FileLock(lock_path) - try: - with lock.acquire(timeout=10): - with open(stat_path, "r") as stat_file: - _ = csvtable_to_dict(stat_file) - print(tabulate.tabulate(_, headers="keys")) - except Timeout: - pass + status = Conductor.get_status(args.directory) - return 0 + if status: + print(tabulate.tabulate(status, headers="keys")) + return 0 + else: + print( + "Status check failed. If the issue persists, please verify that" + "you are passing in a path to a study.") + return 1 def cancel_study(args): """Flag a study to be cancelled.""" if not os.path.isdir(args.directory): + print("Attempted to cancel a path that was not a directory.") return 1 - lock_path = os.path.join(args.directory, ".cancel.lock") - - with open(lock_path, 'a'): - os.utime(lock_path, None) + Conductor.mark_cancelled(args.directory) return 0 @@ -238,29 +231,19 @@ def run_study(args): throttle=args.throttle, submission_attempts=args.attempts, restart_limit=args.rlimit, use_tmp=args.usetmp, hash_ws=args.hashws) - # Stage the study. - path, exec_dag = study.stage() - # Write metadata - study.store_metadata() - - if not spec.batch: - exec_dag.set_adapter({"type": "local"}) - else: - if "type" not in spec.batch: - spec.batch["type"] = "local" - - exec_dag.set_adapter(spec.batch) - + batch = {"type": "local"} + if spec.batch: + batch = spec.batch + if "type" not in batch: + batch["type"] = "local" # Copy the spec to the output directory - shutil.copy(args.specification, path) - + shutil.copy(args.specification, study.output_path) # Check for a dry run if args.dryrun: raise NotImplementedError("The 'dryrun' mode is in development.") - - # Pickle up the DAG - pkl_path = make_safe_path(path, *["{}.pkl".format(study.name)]) - exec_dag.pickle(pkl_path) + # Use the Conductor's classmethod to store the study. + Conductor.store_study(study) + Conductor.store_batch(study.output_path, batch) # If we are automatically launching, just set the input as yes. if args.autoyes: @@ -274,22 +257,22 @@ def run_study(args): if args.fg: # Launch in the foreground. LOGGER.info("Running Maestro Conductor in the foreground.") - cancel_path = os.path.join(path, ".cancel.lock") - # capture the StudyStatus enum to return - completion_status = monitor_study(exec_dag, pkl_path, - cancel_path, args.sleeptime) + conductor = Conductor(study) + conductor.initialize(batch, args.sleeptime) + completion_status = conductor.monitor_study() + conductor.cleanup() return completion_status.value else: # Launch manager with nohup log_path = make_safe_path( study.output_path, - *["{}.txt".format(exec_dag.name)]) + *["{}.txt".format(study.name)]) cmd = ["nohup", "conductor", "-t", str(args.sleeptime), "-d", str(args.debug_lvl), - path, - "&>", log_path] + study.output_path, + ">", log_path, "2>&1"] LOGGER.debug(" ".join(cmd)) start_process(" ".join(cmd))