Skip to content

Commit

Permalink
slurm: adds 1st prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
Rokas Maciulaitis committed Dec 3, 2019
1 parent 499c0cd commit 989f51d
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 44 deletions.
20 changes: 12 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@ RUN apt-get update && \
apt-get install -y vim-tiny && \
pip install --upgrade pip

RUN export DEBIAN_FRONTEND=noninteractive ;\
apt-get -yq install krb5-user \
krb5-config \
libkrb5-dev \
libauthen-krb5-perl \
gcc;
ADD etc/krb5.conf /etc/krb5.conf


ARG COMPUTE_BACKENDS=kubernetes
# CERN HTCondor part taken from https://gitlab.cern.ch/batch-team/condorsubmit
RUN if echo "$COMPUTE_BACKENDS" | grep -q "htcondorcern"; then \
export DEBIAN_FRONTEND=noninteractive ;\
apt-get -yq install wget alien gnupg2 \
krb5-user \
krb5-config \
libkrb5-dev \
libauthen-krb5-perl \
--no-install-recommends; \
apt-get -yq install wget alien gnupg2 ;\
wget -O ngbauth-submit.rpm http://linuxsoft.cern.ch/internal/repos/batch7-stable/x86_64/os/Packages/ngbauth-submit-0.23-1.el7.noarch.rpm; \
wget -O cernbatchsubmit.rpm http://linuxsoft.cern.ch/internal/repos/batch7-stable/x86_64/os/Packages/cernbatchsubmit-0.1.0-1.el7.x86_64.rpm; \
yes | alien -i cernbatchsubmit.rpm; \
Expand All @@ -34,13 +37,14 @@ RUN if echo "$COMPUTE_BACKENDS" | grep -q "htcondorcern"; then \
fi

RUN if echo "$COMPUTE_BACKENDS" | grep -q "slurmcern"; then \
apt-get install -y openssh-client; \
export DEBIAN_FRONTEND=noninteractive ;\
apt-get -yq install openssh-client \
--no-install-recommends; \
fi

ADD etc/cernsubmit.yaml /etc/condor/
ADD etc/10_cernsubmit.config /etc/condor/config.d/

ADD etc/krb5.conf /etc/krb5.conf
ADD etc/ngbauth-submit /etc/sysconfig/
ADD etc/ngauth_batch_crypt_pub.pem /etc/
ADD etc/cerngridca.crt /usr/local/share/ca-certificates/cerngridca.crt
Expand Down
22 changes: 19 additions & 3 deletions reana_job_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@
from reana_job_controller.htcondorcern_job_manager import \
HTCondorJobManagerCERN
from reana_job_controller.job_monitor import (JobMonitorHTCondorCERN,
JobMonitorKubernetes)
JobMonitorKubernetes,
JobMonitorSlurmCERN)
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
from reana_job_controller.slurmcern_job_manager import SlurmJobManagerCERN

SHARED_VOLUME_PATH_ROOT = os.getenv('SHARED_VOLUME_PATH_ROOT', '/var/reana')
"""Root path of the shared volume ."""

COMPUTE_BACKENDS = {
'kubernetes': KubernetesJobManager,
'htcondorcern': HTCondorJobManagerCERN
'htcondorcern': HTCondorJobManagerCERN,
'slurmcern': SlurmJobManagerCERN
}
"""Supported job compute backends and corresponding management class."""

JOB_MONITORS = {
'kubernetes': JobMonitorKubernetes,
'htcondorcern': JobMonitorHTCondorCERN
'htcondorcern': JobMonitorHTCondorCERN,
'slurmcern': JobMonitorSlurmCERN,
}
"""Classes responsible for monitoring specific backend jobs"""

Expand Down Expand Up @@ -80,3 +84,15 @@
"""Kerberos configMap name. Must be the same as in
reana_cluster/backends/kubernetes/templates/configmaps/kerberos.yaml.
"""

SLURM_HEADNODE_HOSTNAME = os.getenv('SLURM_HOSTNAME', 'hpc-batch.cern.ch')
"""Hostname of SLURM head node used for job management via SSH connection."""

SLURM_HEADNODE_PORT = os.getenv('SLURM_CLUSTER_PORT', '22')
"""Port of SLURM head node."""


SLURM_PARTITION = os.getenv('SLURM_PARTITION', 'batch-short')
"""Default slurm partition."""

SLURM_HOME_PATH = os.getenv('SLURM_HOME_PATH', '')
3 changes: 2 additions & 1 deletion reana_job_controller/htcondorcern_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from retrying import retry

from reana_job_controller.job_manager import JobManager
from reana_job_controller.utils import initialize_krb5_token

thread_local = threading.local()

Expand Down Expand Up @@ -202,10 +203,10 @@ def _submit(self, job_ad):
logging.info('Submiting job - {}'.format(job_ad))
clusterid = schedd.submit(job_ad, 1, True, ads)
HTCondorJobManagerCERN._spool_input(ads)
return clusterid
except Exception as e:
logging.error("Submission failed: {0}".format(e), exc_info=True)
time.sleep(10)
return clusterid

@retry(stop_max_attempt_number=MAX_NUM_RETRIES)
def _spool_input(ads):
Expand Down
141 changes: 122 additions & 19 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""Job monitoring wrapper."""

import logging
import os
import threading
import time
import traceback
Expand All @@ -19,26 +20,42 @@
from reana_db.database import Session
from reana_db.models import Job

from reana_job_controller import config
from reana_job_controller.htcondorcern_job_manager import \
HTCondorJobManagerCERN
from reana_job_controller.job_db import JOB_DB
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
from reana_job_controller.utils import singleton
from reana_job_controller.slurmcern_job_manager import SlurmJobManagerCERN
from reana_job_controller.utils import SSHClient, singleton


@singleton
class JobMonitorKubernetes():
"""Kubernetes job monitor."""
class JobMonitor():
"""Job monitor interface."""

def __init__(self, app=None):
"""Initialize Kubernetes job monitor thread."""
def __init__(self, thread_name, app=None):
"""Initialize REANA job monitors."""
self.job_event_reader_thread = threading.Thread(
name='kubernetes_job_monitor',
name=thread_name,
target=self.watch_jobs,
args=(JOB_DB,))
args=(JOB_DB, app))
self.job_event_reader_thread.daemon = True
self.job_event_reader_thread.start()

def watch_jobs(self, job_db, app):
"""Monitor running jobs."""
raise NotImplementedError


@singleton
class JobMonitorKubernetes(JobMonitor):
"""Kubernetes job monitor."""

def __init__(self, app=None):
"""Initialize Kubernetes job monitor thread."""
super(__class__, self).__init__(
thread_name='kubernetes_job_monitor'
)

def get_container_logs(self, last_spawned_pod):
"""Get job pod's containers' logs."""
try:
Expand All @@ -62,7 +79,7 @@ def get_container_logs(self, last_spawned_pod):
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))

def watch_jobs(self, job_db):
def watch_jobs(self, job_db, app=None):
"""Open stream connection to k8s apiserver to watch all jobs status.
:param job_db: Dictionary which contains all current jobs.
Expand Down Expand Up @@ -143,26 +160,29 @@ def watch_jobs(self, job_db):


@singleton
class JobMonitorHTCondorCERN():
class JobMonitorHTCondorCERN(JobMonitor):
"""HTCondor jobs monitor CERN."""

def __init__(self, app):
"""Initialize HTCondor job monitor thread."""
self.job_event_reader_thread = threading.Thread(
name='htcondorcern_job_monitor',
target=self.watch_jobs,
args=(JOB_DB, app))
self.job_event_reader_thread.daemon = True
self.job_event_reader_thread.start()
super(__class__, self).__init__(
thread_name='htcondor_job_monitor',
app=app
)

def format_condor_job_que_query(self, backend_job_ids):
"""Format HTCondor job que query."""
base_query = 'ClusterId == {} ||'
query = ''
for job_id in backend_job_ids:
query += base_query.format(job_id)
return query[:-2]

def watch_jobs(self, job_db, app):
"""Watch currently running HTCondor jobs.
:param job_db: Dictionary which contains all current jobs.
"""
ads = \
['ClusterId', 'JobStatus', 'ExitCode', 'ExitStatus',
'HoldReasonCode']
ignore_hold_codes = [35, 16]
statuses_to_skip = ['succeeded', 'failed']
while True:
Expand Down Expand Up @@ -241,6 +261,89 @@ def watch_jobs(self, job_db, app):
time.sleep(120)


slurmJobStatus = {
'failed': ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL',
'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'SUSPENDED',
'STOPPED'],
'succeeded': ['COMPLETED'],
'running': ['CONFIGURING', 'COMPLETING', 'RUNNING', 'STAGE_OUT'],
'idle': ['PENDING', 'REQUEUE_FED', 'REQUEUE_HOLD', 'RESV_DEL_HOLD',
'REQUEUED', 'RESIZING']
# 'REVOKED',
# 'SIGNALING',
# 'SPECIAL_EXIT',
}


@singleton
class JobMonitorSlurmCERN(JobMonitor):
"""Slurm jobs monitor CERN."""

def __init__(self, app=None):
"""Initialize Slurm job monitor thread."""
super(__class__, self).__init__(
thread_name='slurm_job_monitor'
)

def format_slurm_job_query(self, backend_job_ids):
"""Format Slurm job query."""
cmd = 'sacct --jobs {} --noheader --allocations --parsable ' \
'--format State,JobID'.format(','.join(backend_job_ids))
return cmd

def watch_jobs(self, job_db, app=None):
"""Use SSH connection to slurm submitnode to monitor jobs.
:param job_db: Dictionary which contains all running jobs.
"""
slurm_connection = SSHClient(
hostname=config.SLURM_HEADNODE_HOSTNAME,
username=os.environ.get('SLURMCERN_USER'),
password=os.environ.get('SLURMCERN_SECRET'),
port=config.SLURM_HEADNODE_PORT,
)
statuses_to_skip = ['succeeded', 'failed']
while True:
logging.debug('Starting a new stream request to watch Jobs')
try:
slurm_jobs = {}
for id, job_dict in job_db.items():
if (not job_db[id]['deleted'] and
job_db[id]['compute_backend'] == 'slurmcern' and
not job_db[id]['status'] in statuses_to_skip):
slurm_jobs[job_dict['backend_job_id']] = id
if not slurm_jobs.keys():
logging.error('No slurm jobs')
pass
slurm_query_cmd = self.format_slurm_job_query(
slurm_jobs.keys())
stdin, stdout, stderr = \
slurm_connection.ssh_client.exec_command(slurm_query_cmd)
stdout = stdout.read().decode("utf-8").rstrip().split('\n')
for item in stdout:
slurm_job_status = item.split('|')[0]
slurm_job_id = item.split('|')[1]
job_id = slurm_jobs[slurm_job_id]
if slurm_job_status in slurmJobStatus['succeeded']:
SlurmJobManagerCERN.get_outputs()
job_db[job_id]['status'] = 'succeeded'
job_db[job_id]['deleted'] = True
if slurm_job_status in slurmJobStatus['failed']:
job_db[job_id]['status'] = 'failed'
job_db[job_id]['deleted'] = True
if slurm_job_status in slurmJobStatus['failed'] or \
slurm_job_status in slurmJobStatus['succeeded']:
job_db[job_id]['log'] = \
SlurmJobManagerCERN.get_logs(
backend_job_id=job_dict['backend_job_id'],
workspace=job_db[
job_id]['obj'].workflow_workspace)
store_logs(logs=job_db[job_id]['log'], job_id=job_id)
except Exception as e:
logging.error("Unexpected error: {}".format(e), exc_info=True)
time.sleep(120)


def store_logs(logs, job_id):
"""Write logs to DB."""
try:
Expand Down
2 changes: 1 addition & 1 deletion reana_job_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def create_job(): # noqa
job['compute_backend'] = compute_backend
JOB_DB[str(job['job_id'])] = job
current_app.config['JOB_MONITORS'][compute_backend](
current_app._get_current_object())
app=current_app._get_current_object())
return jsonify({'job_id': job['job_id']}), 201
else:
return jsonify({'job': 'Could not be allocated'}), 500
Expand Down
Loading

0 comments on commit 989f51d

Please sign in to comment.