diff --git a/alea/submitters/README.md b/alea/submitters/README.md index 6ece1ca..0dd7b90 100644 --- a/alea/submitters/README.md +++ b/alea/submitters/README.md @@ -23,7 +23,6 @@ chmod 600 $HOME/.xenon_service_proxy export X509_USER_PROXY=$HOME/.xenon_service_proxy export PYTHONPATH=`pegasus-config --python`:$PYTHONPATH -export PYTHONPATH="$HOME/.local/lib/python3.9/site-packages${PYTHONPATH:+:$PYTHONPATH}" ``` ### Configuration diff --git a/alea/submitters/htcondor.py b/alea/submitters/htcondor.py index abc2a0c..e1e3174 100644 --- a/alea/submitters/htcondor.py +++ b/alea/submitters/htcondor.py @@ -2,13 +2,10 @@ import getpass import tarfile import shlex -import tempfile -import time -import threading -import subprocess from datetime import datetime import logging from pathlib import Path +from utilix.x509 import _validate_x509_proxy from Pegasus.api import ( Arch, Operation, @@ -78,10 +75,11 @@ def __init__(self, *args, **kwargs): # User can provide a name for the workflow, otherwise it will be the current time self._setup_workflow_id() # Pegasus workflow directory - self.generated_dir = os.path.join(self.work_dir, self.workflow_id, "generated") - self.runs_dir = os.path.join(self.work_dir, self.workflow_id, "runs") - self.outputs_dir = os.path.join(self.work_dir, self.workflow_id, "outputs") - self.scratch_dir = os.path.join(self.work_dir, self.workflow_id, "scratch") + self.workflow_dir = os.path.join(self.work_dir, self.workflow_id) + self.generated_dir = os.path.join(self.workflow_dir, "generated") + self.runs_dir = os.path.join(self.workflow_dir, "runs") + self.outputs_dir = os.path.join(self.workflow_dir, "outputs") + self.scratch_dir = os.path.join(self.workflow_dir, "scratch") @property def template_tarball(self): @@ -125,25 +123,6 @@ def requirements(self): return _requirements - def _validate_x509_proxy(self, min_valid_hours=20): - """Ensure $X509_USER_PROXY exists and has enough time left. - - This is necessary only if you are going to use Rucio. - - """ - self.x509_user_proxy = os.getenv("X509_USER_PROXY") - assert self.x509_user_proxy, "Please provide a valid X509_USER_PROXY environment variable." - - logger.debug("Verifying that the X509_USER_PROXY proxy has enough lifetime") - shell = Shell(f"grid-proxy-info -timeleft -file {self.x509_user_proxy}") - shell.run() - valid_hours = int(shell.get_outerr()) / 60 / 60 - if valid_hours < min_valid_hours: - raise RuntimeError( - f"User proxy is only valid for {valid_hours} hours. " - f"Minimum required is {min_valid_hours} hours." - ) - def _validate_template_path(self): """Validate the template path.""" if self.template_path is None: @@ -251,12 +230,12 @@ def _setup_workflow_id(self): """Set up the workflow ID.""" # If you have named the workflow, use that name. Otherwise, use the current time as name. _workflow_id = self.htcondor_configurations.pop("workflow_id", None) + now = datetime.now().strftime("%Y%m%d%H%M") if _workflow_id: - self.workflow_id = "-".join( - (_workflow_id, self.computation, datetime.now().strftime("%Y%m%d%H%M")) - ) + workflow_id = (_workflow_id, self.computation, now) else: - self.workflow_id = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + workflow_id = (self.computation, now) + self.workflow_id = "-".join(workflow_id) def _generate_sc(self): """Generates the SiteCatalog for the workflow.""" @@ -710,9 +689,11 @@ def _check_filename_unique(self): def submit(self, **kwargs): """Serve as the main function to submit the workflow.""" - if os.path.exists(self.runs_dir): - raise RuntimeError(f"Workflow already exists at {self.runs_dir}.") - self._validate_x509_proxy() + if os.path.exists(self.workflow_dir): + raise RuntimeError(f"Workflow already exists at {self.workflow_dir}.") + + # ensure we have a proxy with enough time left + _validate_x509_proxy() # 0o755 means read/write/execute for owner, read/execute for everyone else os.makedirs(self.generated_dir, 0o755, exist_ok=True) @@ -732,92 +713,8 @@ def submit(self, **kwargs): self._plan_and_submit() if self.debug: self.wf.graph( - output=os.path.join(self.outputs_dir, "workflow_graph.dot"), label="xform-id" + output=os.path.join(self.generated_dir, "workflow_graph.dot"), label="xform-id" ) self.wf.graph( - output=os.path.join(self.outputs_dir, "workflow_graph.svg"), label="xform-id" - ) - - -class Shell(object): - """Provides a shell callout with buffered stdout/stderr, error handling and timeout.""" - - def __init__(self, cmd, timeout_secs=1 * 60 * 60, log_cmd=False, log_outerr=False): - self._cmd = cmd - self._timeout_secs = timeout_secs - self._log_cmd = log_cmd - self._log_outerr = log_outerr - self._process = None - self._out_file = None - self._outerr = "" - self._duration = 0.0 - - def run(self): - def target(): - self._process = subprocess.Popen( - self._cmd, - shell=True, - stdout=self._out_file, - stderr=subprocess.STDOUT, - preexec_fn=os.setpgrp, - ) - self._process.communicate() - - if self._log_cmd: - print(self._cmd) - - # temp file for the stdout/stderr - self._out_file = tempfile.TemporaryFile(prefix="outsource-", suffix=".out") - - ts_start = time.time() - - thread = threading.Thread(target=target) - thread.start() - - thread.join(self._timeout_secs) - if thread.is_alive(): - # do our best to kill the whole process group - try: - kill_cmd = f"kill -TERM -{os.getpgid(self._process.pid)}" - kp = subprocess.Popen(kill_cmd, shell=True) - kp.communicate() - self._process.terminate() - except Exception: - pass - thread.join() - # log the output - self._out_file.seek(0) - stdout = self._out_file.read().decode("utf-8").strip() - if self._log_outerr and len(stdout) > 0: - print(stdout) - self._out_file.close() - raise RuntimeError( - f"Command timed out after {int(self._timeout_secs):d} seconds: {self._cmd}." + output=os.path.join(self.generated_dir, "workflow_graph.svg"), label="xform-id" ) - - self._duration = time.time() - ts_start - - # log the output - self._out_file.seek(0) - self._outerr = self._out_file.read().decode("utf-8").strip() - if self._log_outerr and len(self._outerr) > 0: - print(self._outerr) - self._out_file.close() - - if self._process.returncode != 0: - raise RuntimeError( - f"Command exited with non-zero exit code ({int(self._process.returncode):d}): " - f"{self._cmd}\n{self._outerr}" - ) - - def get_outerr(self): - """Returns the combined stdout and stderr from the command.""" - return self._outerr - - def get_exit_code(self): - """Returns the exit code from the process.""" - return self._process.returncode - - def get_duration(self): - """Returns the timing of the command (seconds)""" - return self._duration