Skip to content

Commit

Permalink
Use utilix to validate X509 proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
dachengx committed Sep 19, 2024
1 parent b7dba09 commit bf85cfa
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 121 deletions.
1 change: 0 additions & 1 deletion alea/submitters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ chmod 600 $HOME/.xenon_service_proxy
export X509_USER_PROXY=$HOME/.xenon_service_proxy
export PYTHONPATH=`pegasus-config --python`:$PYTHONPATH
export PYTHONPATH="$HOME/.local/lib/python3.9/site-packages${PYTHONPATH:+:$PYTHONPATH}"
```

### Configuration
Expand Down
137 changes: 17 additions & 120 deletions alea/submitters/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
import getpass
import tarfile
import shlex
import tempfile
import time
import threading
import subprocess
from datetime import datetime
import logging
from pathlib import Path
from utilix.x509 import _validate_x509_proxy
from Pegasus.api import (
Arch,
Operation,
Expand Down Expand Up @@ -78,10 +75,11 @@ def __init__(self, *args, **kwargs):
# User can provide a name for the workflow, otherwise it will be the current time
self._setup_workflow_id()
# Pegasus workflow directory
self.generated_dir = os.path.join(self.work_dir, self.workflow_id, "generated")
self.runs_dir = os.path.join(self.work_dir, self.workflow_id, "runs")
self.outputs_dir = os.path.join(self.work_dir, self.workflow_id, "outputs")
self.scratch_dir = os.path.join(self.work_dir, self.workflow_id, "scratch")
self.workflow_dir = os.path.join(self.work_dir, self.workflow_id)
self.generated_dir = os.path.join(self.workflow_dir, "generated")
self.runs_dir = os.path.join(self.workflow_dir, "runs")
self.outputs_dir = os.path.join(self.workflow_dir, "outputs")
self.scratch_dir = os.path.join(self.workflow_dir, "scratch")

@property
def template_tarball(self):
Expand Down Expand Up @@ -125,25 +123,6 @@ def requirements(self):

return _requirements

def _validate_x509_proxy(self, min_valid_hours=20):
"""Ensure $X509_USER_PROXY exists and has enough time left.
This is necessary only if you are going to use Rucio.
"""
self.x509_user_proxy = os.getenv("X509_USER_PROXY")
assert self.x509_user_proxy, "Please provide a valid X509_USER_PROXY environment variable."

logger.debug("Verifying that the X509_USER_PROXY proxy has enough lifetime")
shell = Shell(f"grid-proxy-info -timeleft -file {self.x509_user_proxy}")
shell.run()
valid_hours = int(shell.get_outerr()) / 60 / 60
if valid_hours < min_valid_hours:
raise RuntimeError(
f"User proxy is only valid for {valid_hours} hours. "
f"Minimum required is {min_valid_hours} hours."
)

def _validate_template_path(self):
"""Validate the template path."""
if self.template_path is None:
Expand Down Expand Up @@ -251,12 +230,12 @@ def _setup_workflow_id(self):
"""Set up the workflow ID."""
# If you have named the workflow, use that name. Otherwise, use the current time as name.
_workflow_id = self.htcondor_configurations.pop("workflow_id", None)
now = datetime.now().strftime("%Y%m%d%H%M")
if _workflow_id:
self.workflow_id = "-".join(
(_workflow_id, self.computation, datetime.now().strftime("%Y%m%d%H%M"))
)
workflow_id = (_workflow_id, self.computation, now)
else:
self.workflow_id = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
workflow_id = (self.computation, now)
self.workflow_id = "-".join(workflow_id)

def _generate_sc(self):
"""Generates the SiteCatalog for the workflow."""
Expand Down Expand Up @@ -710,9 +689,11 @@ def _check_filename_unique(self):

def submit(self, **kwargs):
"""Serve as the main function to submit the workflow."""
if os.path.exists(self.runs_dir):
raise RuntimeError(f"Workflow already exists at {self.runs_dir}.")
self._validate_x509_proxy()
if os.path.exists(self.workflow_dir):
raise RuntimeError(f"Workflow already exists at {self.workflow_dir}.")

# ensure we have a proxy with enough time left
_validate_x509_proxy()

# 0o755 means read/write/execute for owner, read/execute for everyone else
os.makedirs(self.generated_dir, 0o755, exist_ok=True)
Expand All @@ -732,92 +713,8 @@ def submit(self, **kwargs):
self._plan_and_submit()
if self.debug:
self.wf.graph(
output=os.path.join(self.outputs_dir, "workflow_graph.dot"), label="xform-id"
output=os.path.join(self.generated_dir, "workflow_graph.dot"), label="xform-id"
)
self.wf.graph(
output=os.path.join(self.outputs_dir, "workflow_graph.svg"), label="xform-id"
)


class Shell(object):
"""Provides a shell callout with buffered stdout/stderr, error handling and timeout."""

def __init__(self, cmd, timeout_secs=1 * 60 * 60, log_cmd=False, log_outerr=False):
self._cmd = cmd
self._timeout_secs = timeout_secs
self._log_cmd = log_cmd
self._log_outerr = log_outerr
self._process = None
self._out_file = None
self._outerr = ""
self._duration = 0.0

def run(self):
def target():
self._process = subprocess.Popen(
self._cmd,
shell=True,
stdout=self._out_file,
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp,
)
self._process.communicate()

if self._log_cmd:
print(self._cmd)

# temp file for the stdout/stderr
self._out_file = tempfile.TemporaryFile(prefix="outsource-", suffix=".out")

ts_start = time.time()

thread = threading.Thread(target=target)
thread.start()

thread.join(self._timeout_secs)
if thread.is_alive():
# do our best to kill the whole process group
try:
kill_cmd = f"kill -TERM -{os.getpgid(self._process.pid)}"
kp = subprocess.Popen(kill_cmd, shell=True)
kp.communicate()
self._process.terminate()
except Exception:
pass
thread.join()
# log the output
self._out_file.seek(0)
stdout = self._out_file.read().decode("utf-8").strip()
if self._log_outerr and len(stdout) > 0:
print(stdout)
self._out_file.close()
raise RuntimeError(
f"Command timed out after {int(self._timeout_secs):d} seconds: {self._cmd}."
output=os.path.join(self.generated_dir, "workflow_graph.svg"), label="xform-id"
)

self._duration = time.time() - ts_start

# log the output
self._out_file.seek(0)
self._outerr = self._out_file.read().decode("utf-8").strip()
if self._log_outerr and len(self._outerr) > 0:
print(self._outerr)
self._out_file.close()

if self._process.returncode != 0:
raise RuntimeError(
f"Command exited with non-zero exit code ({int(self._process.returncode):d}): "
f"{self._cmd}\n{self._outerr}"
)

def get_outerr(self):
"""Returns the combined stdout and stderr from the command."""
return self._outerr

def get_exit_code(self):
"""Returns the exit code from the process."""
return self._process.returncode

def get_duration(self):
"""Returns the timing of the command (seconds)"""
return self._duration

0 comments on commit bf85cfa

Please sign in to comment.