diff --git a/Dockerfile b/Dockerfile index 18149f68..24a29676 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,17 +11,20 @@ RUN apt-get update && \ apt-get install -y vim-tiny && \ pip install --upgrade pip +RUN export DEBIAN_FRONTEND=noninteractive ;\ + apt-get -yq install krb5-user \ + krb5-config \ + libkrb5-dev \ + libauthen-krb5-perl \ + gcc; +ADD etc/krb5.conf /etc/krb5.conf + ARG COMPUTE_BACKENDS=kubernetes # CERN HTCondor part taken from https://gitlab.cern.ch/batch-team/condorsubmit RUN if echo "$COMPUTE_BACKENDS" | grep -q "htcondorcern"; then \ export DEBIAN_FRONTEND=noninteractive ;\ - apt-get -yq install wget alien gnupg2 \ - krb5-user \ - krb5-config \ - libkrb5-dev \ - libauthen-krb5-perl \ - --no-install-recommends; \ + apt-get -yq install wget alien gnupg2 ;\ wget -O ngbauth-submit.rpm http://linuxsoft.cern.ch/internal/repos/batch7-stable/x86_64/os/Packages/ngbauth-submit-0.23-1.el7.noarch.rpm; \ wget -O cernbatchsubmit.rpm http://linuxsoft.cern.ch/internal/repos/batch7-stable/x86_64/os/Packages/cernbatchsubmit-0.1.0-1.el7.x86_64.rpm; \ yes | alien -i cernbatchsubmit.rpm; \ @@ -34,13 +37,14 @@ RUN if echo "$COMPUTE_BACKENDS" | grep -q "htcondorcern"; then \ fi RUN if echo "$COMPUTE_BACKENDS" | grep -q "slurmcern"; then \ - apt-get install -y openssh-client; \ + export DEBIAN_FRONTEND=noninteractive ;\ + apt-get -yq install openssh-client \ + --no-install-recommends; \ fi ADD etc/cernsubmit.yaml /etc/condor/ ADD etc/10_cernsubmit.config /etc/condor/config.d/ -ADD etc/krb5.conf /etc/krb5.conf ADD etc/ngbauth-submit /etc/sysconfig/ ADD etc/ngauth_batch_crypt_pub.pem /etc/ ADD etc/cerngridca.crt /usr/local/share/ca-certificates/cerngridca.crt diff --git a/reana_job_controller/config.py b/reana_job_controller/config.py index 1e5379b0..1ef158ea 100644 --- a/reana_job_controller/config.py +++ b/reana_job_controller/config.py @@ -13,21 +13,25 @@ from reana_job_controller.htcondorcern_job_manager import \ HTCondorJobManagerCERN from reana_job_controller.job_monitor import (JobMonitorHTCondorCERN, - JobMonitorKubernetes) + JobMonitorKubernetes, + JobMonitorSlurmCERN) from reana_job_controller.kubernetes_job_manager import KubernetesJobManager +from reana_job_controller.slurmcern_job_manager import SlurmJobManagerCERN SHARED_VOLUME_PATH_ROOT = os.getenv('SHARED_VOLUME_PATH_ROOT', '/var/reana') """Root path of the shared volume .""" COMPUTE_BACKENDS = { 'kubernetes': KubernetesJobManager, - 'htcondorcern': HTCondorJobManagerCERN + 'htcondorcern': HTCondorJobManagerCERN, + 'slurmcern': SlurmJobManagerCERN } """Supported job compute backends and corresponding management class.""" JOB_MONITORS = { 'kubernetes': JobMonitorKubernetes, - 'htcondorcern': JobMonitorHTCondorCERN + 'htcondorcern': JobMonitorHTCondorCERN, + 'slurmcern': JobMonitorSlurmCERN, } """Classes responsible for monitoring specific backend jobs""" @@ -80,3 +84,15 @@ """Kerberos configMap name. Must be the same as in reana_cluster/backends/kubernetes/templates/configmaps/kerberos.yaml. """ + +SLURM_HEADNODE_HOSTNAME = os.getenv('SLURM_HOSTNAME', 'hpc-batch.cern.ch') +"""Hostname of SLURM head node used for job management via SSH connection.""" + +SLURM_HEADNODE_PORT = os.getenv('SLURM_CLUSTER_PORT', '22') +"""Port of SLURM head node.""" + + +SLURM_PARTITION = os.getenv('SLURM_PARTITION', 'batch-short') +"""Default slurm partition.""" + +SLURM_HOME_PATH = os.getenv('SLURM_HOME_PATH', '') diff --git a/reana_job_controller/htcondorcern_job_manager.py b/reana_job_controller/htcondorcern_job_manager.py index 5dc625f4..59c7ebb7 100644 --- a/reana_job_controller/htcondorcern_job_manager.py +++ b/reana_job_controller/htcondorcern_job_manager.py @@ -26,6 +26,7 @@ from retrying import retry from reana_job_controller.job_manager import JobManager +from reana_job_controller.utils import initialize_krb5_token thread_local = threading.local() @@ -202,10 +203,10 @@ def _submit(self, job_ad): logging.info('Submiting job - {}'.format(job_ad)) clusterid = schedd.submit(job_ad, 1, True, ads) HTCondorJobManagerCERN._spool_input(ads) + return clusterid except Exception as e: logging.error("Submission failed: {0}".format(e), exc_info=True) time.sleep(10) - return clusterid @retry(stop_max_attempt_number=MAX_NUM_RETRIES) def _spool_input(ads): diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 0c5d4f04..2036d43e 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -9,6 +9,7 @@ """Job monitoring wrapper.""" import logging +import os import threading import time import traceback @@ -19,26 +20,42 @@ from reana_db.database import Session from reana_db.models import Job +from reana_job_controller import config from reana_job_controller.htcondorcern_job_manager import \ HTCondorJobManagerCERN from reana_job_controller.job_db import JOB_DB from reana_job_controller.kubernetes_job_manager import KubernetesJobManager -from reana_job_controller.utils import singleton +from reana_job_controller.slurmcern_job_manager import SlurmJobManagerCERN +from reana_job_controller.utils import SSHClient, singleton -@singleton -class JobMonitorKubernetes(): - """Kubernetes job monitor.""" +class JobMonitor(): + """Job monitor interface.""" - def __init__(self, app=None): - """Initialize Kubernetes job monitor thread.""" + def __init__(self, thread_name, app=None): + """Initialize REANA job monitors.""" self.job_event_reader_thread = threading.Thread( - name='kubernetes_job_monitor', + name=thread_name, target=self.watch_jobs, - args=(JOB_DB,)) + args=(JOB_DB, app)) self.job_event_reader_thread.daemon = True self.job_event_reader_thread.start() + def watch_jobs(self, job_db, app): + """Monitor running jobs.""" + raise NotImplementedError + + +@singleton +class JobMonitorKubernetes(JobMonitor): + """Kubernetes job monitor.""" + + def __init__(self, app=None): + """Initialize Kubernetes job monitor thread.""" + super(__class__, self).__init__( + thread_name='kubernetes_job_monitor' + ) + def get_container_logs(self, last_spawned_pod): """Get job pod's containers' logs.""" try: @@ -62,7 +79,7 @@ def get_container_logs(self, last_spawned_pod): logging.error(traceback.format_exc()) logging.error("Unexpected error: {}".format(e)) - def watch_jobs(self, job_db): + def watch_jobs(self, job_db, app=None): """Open stream connection to k8s apiserver to watch all jobs status. :param job_db: Dictionary which contains all current jobs. @@ -143,26 +160,29 @@ def watch_jobs(self, job_db): @singleton -class JobMonitorHTCondorCERN(): +class JobMonitorHTCondorCERN(JobMonitor): """HTCondor jobs monitor CERN.""" def __init__(self, app): """Initialize HTCondor job monitor thread.""" - self.job_event_reader_thread = threading.Thread( - name='htcondorcern_job_monitor', - target=self.watch_jobs, - args=(JOB_DB, app)) - self.job_event_reader_thread.daemon = True - self.job_event_reader_thread.start() + super(__class__, self).__init__( + thread_name='htcondor_job_monitor', + app=app + ) + + def format_condor_job_que_query(self, backend_job_ids): + """Format HTCondor job que query.""" + base_query = 'ClusterId == {} ||' + query = '' + for job_id in backend_job_ids: + query += base_query.format(job_id) + return query[:-2] def watch_jobs(self, job_db, app): """Watch currently running HTCondor jobs. :param job_db: Dictionary which contains all current jobs. """ - ads = \ - ['ClusterId', 'JobStatus', 'ExitCode', 'ExitStatus', - 'HoldReasonCode'] ignore_hold_codes = [35, 16] statuses_to_skip = ['succeeded', 'failed'] while True: @@ -241,6 +261,89 @@ def watch_jobs(self, job_db, app): time.sleep(120) +slurmJobStatus = { + 'failed': ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL', + 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'SUSPENDED', + 'STOPPED'], + 'succeeded': ['COMPLETED'], + 'running': ['CONFIGURING', 'COMPLETING', 'RUNNING', 'STAGE_OUT'], + 'idle': ['PENDING', 'REQUEUE_FED', 'REQUEUE_HOLD', 'RESV_DEL_HOLD', + 'REQUEUED', 'RESIZING'] + # 'REVOKED', + # 'SIGNALING', + # 'SPECIAL_EXIT', +} + + +@singleton +class JobMonitorSlurmCERN(JobMonitor): + """Slurm jobs monitor CERN.""" + + def __init__(self, app=None): + """Initialize Slurm job monitor thread.""" + super(__class__, self).__init__( + thread_name='slurm_job_monitor' + ) + + def format_slurm_job_query(self, backend_job_ids): + """Format Slurm job query.""" + cmd = 'sacct --jobs {} --noheader --allocations --parsable ' \ + '--format State,JobID'.format(','.join(backend_job_ids)) + return cmd + + def watch_jobs(self, job_db, app=None): + """Use SSH connection to slurm submitnode to monitor jobs. + + :param job_db: Dictionary which contains all running jobs. + """ + slurm_connection = SSHClient( + hostname=config.SLURM_HEADNODE_HOSTNAME, + username=os.environ.get('SLURMCERN_USER'), + password=os.environ.get('SLURMCERN_SECRET'), + port=config.SLURM_HEADNODE_PORT, + ) + statuses_to_skip = ['succeeded', 'failed'] + while True: + logging.debug('Starting a new stream request to watch Jobs') + try: + slurm_jobs = {} + for id, job_dict in job_db.items(): + if (not job_db[id]['deleted'] and + job_db[id]['compute_backend'] == 'slurmcern' and + not job_db[id]['status'] in statuses_to_skip): + slurm_jobs[job_dict['backend_job_id']] = id + if not slurm_jobs.keys(): + logging.error('No slurm jobs') + pass + slurm_query_cmd = self.format_slurm_job_query( + slurm_jobs.keys()) + stdin, stdout, stderr = \ + slurm_connection.ssh_client.exec_command(slurm_query_cmd) + stdout = stdout.read().decode("utf-8").rstrip().split('\n') + for item in stdout: + slurm_job_status = item.split('|')[0] + slurm_job_id = item.split('|')[1] + job_id = slurm_jobs[slurm_job_id] + if slurm_job_status in slurmJobStatus['succeeded']: + SlurmJobManagerCERN.get_outputs() + job_db[job_id]['status'] = 'succeeded' + job_db[job_id]['deleted'] = True + if slurm_job_status in slurmJobStatus['failed']: + job_db[job_id]['status'] = 'failed' + job_db[job_id]['deleted'] = True + if slurm_job_status in slurmJobStatus['failed'] or \ + slurm_job_status in slurmJobStatus['succeeded']: + job_db[job_id]['log'] = \ + SlurmJobManagerCERN.get_logs( + backend_job_id=job_dict['backend_job_id'], + workspace=job_db[ + job_id]['obj'].workflow_workspace) + store_logs(logs=job_db[job_id]['log'], job_id=job_id) + except Exception as e: + logging.error("Unexpected error: {}".format(e), exc_info=True) + time.sleep(120) + + def store_logs(logs, job_id): """Write logs to DB.""" try: diff --git a/reana_job_controller/rest.py b/reana_job_controller/rest.py index 97eb0a6f..ffc6c319 100644 --- a/reana_job_controller/rest.py +++ b/reana_job_controller/rest.py @@ -227,7 +227,7 @@ def create_job(): # noqa job['compute_backend'] = compute_backend JOB_DB[str(job['job_id'])] = job current_app.config['JOB_MONITORS'][compute_backend]( - current_app._get_current_object()) + app=current_app._get_current_object()) return jsonify({'job_id': job['job_id']}), 201 else: return jsonify({'job': 'Could not be allocated'}), 500 diff --git a/reana_job_controller/slurmcern_job_manager.py b/reana_job_controller/slurmcern_job_manager.py new file mode 100644 index 00000000..466bcf43 --- /dev/null +++ b/reana_job_controller/slurmcern_job_manager.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2019 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""CERN Slurm Job Manager.""" + +import base64 +import logging +import os +from stat import S_ISDIR + +from flask import current_app + +from reana_job_controller.job_manager import JobManager +from reana_job_controller.utils import SSHClient, initialize_krb5_token + + +class SlurmJobManagerCERN(JobManager): + """Slurm job management.""" + + SLURM_WORKSAPCE_PATH = '' + """Absolute path inside slurm head node used for submission.""" + REANA_WORKSPACE_PATH = '' + """Absolute REANA workspace path.""" + + def __init__(self, docker_img=None, cmd=None, env_vars=None, job_id=None, + workflow_uuid=None, workflow_workspace=None, + cvmfs_mounts='false', shared_file_system=False, + job_name=None, kerberos=False): + """Instanciate Slurm job manager. + + :param docker_img: Docker image. + :type docker_img: str + :param cmd: Command to execute. + :type cmd: list + :param env_vars: Environment variables. + :type env_vars: dict + :param job_id: Unique job id. + :type job_id: str + :param workflow_id: Unique workflow id. + :type workflow_id: str + :param workflow_workspace: Workflow workspace path. + :type workflow_workspace: str + :param cvmfs_mounts: list of CVMFS mounts as a string. + :type cvmfs_mounts: str + :param shared_file_system: if shared file system is available. + :type shared_file_system: bool + :param job_name: Name of the job + :type job_name: str + """ + super(SlurmJobManagerCERN, self).__init__( + docker_img=docker_img, cmd=cmd, + env_vars=env_vars, job_id=job_id, + workflow_uuid=workflow_uuid, + job_name=job_name) + self.compute_backend = "Slurm" + self.workflow_workspace = workflow_workspace + self.cvmfs_mounts = cvmfs_mounts + self.shared_file_system = shared_file_system + self.job_file = 'job.sh' + self.job_description_file = 'job_description.sh' + + def _transfer_inputs(self): + """Transfer inputs to SLURM submit node.""" + stdin, stdout, stderr = \ + self.slurm_connection.ssh_client.exec_command('pwd') + self.slurm_home_path = current_app.config['SLURM_HOME_PATH'] or \ + stdout.read().decode("utf-8").rstrip() + SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH = os.path.join( + self.slurm_home_path, + self.workflow_workspace[1:]) + SlurmJobManagerCERN.REANA_WORKSPACE_PATH = self.workflow_workspace + self.slurm_connection.ssh_client.exec_command( + 'mkdir -p {}'.format( + SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH)) + sftp = self.slurm_connection.ssh_client.open_sftp() + os.chdir(self.workflow_workspace) + for dirpath, dirnames, filenames in os.walk(self.workflow_workspace): + try: + sftp.mkdir( + os.path.join( + self.slurm_home_path, + dirpath[1:]) + ) + except Exception: + pass + for file in filenames: + sftp.put( + os.path.join(dirpath, file), + os.path.join( + self.slurm_home_path, + dirpath[1:], + file) + ) + + @JobManager.execution_hook + def execute(self): + """Execute / submit a job with Slurm.""" + self.cmd = self._encode_cmd(' '.join(self.cmd)) + initialize_krb5_token() + self.slurm_connection = SSHClient( + hostname=current_app.config['SLURM_HEADNODE_HOSTNAME'], + username=os.environ.get('SLURMCERN_USER'), + port=current_app.config['SLURM_HEADNODE_PORT'], + ) + self._transfer_inputs() + self._dump_job_file() + self._dump_job_submission_file() + stdin, stdout, stderr = self.slurm_connection.ssh_client.exec_command( + 'cd {} && sbatch --parsable {}'.format( + SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH, + self.job_description_file)) + backend_job_id = stdout.read().decode("utf-8").rstrip() + return backend_job_id + + def _dump_job_submission_file(self): + """Dump job submission file to the Slurm submit node.""" + job_template = ("#!/bin/bash \n" + "#SBATCH --job-name={job_name} \n" + "#SBATCH --output=reana_job.%j.out \n" + "#SBATCH --error=reana_job.%j.err \n" + "#SBATCH --partition {partition} \n" + "#SBATCH --time 5 \n" + "srun {command}").format( + partition=current_app.config['SLURM_PARTITION'], + job_name=self.job_name, + command=self._wrap_singularity_cmd()) + self.slurm_connection.ssh_client.exec_command( + 'cd {} && job="{}" && echo "$job"> {}'.format( + SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH, + job_template, + self.job_description_file, + )) + + def _dump_job_file(self): + """Dump job file.""" + job_template = '#!/bin/bash \n{}'.format(self.cmd) + self.slurm_connection.ssh_client.exec_command( + 'cd {} && job="{}" && echo "$job" > {} && chmod +x {}'.format( + SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH, + job_template, + self.job_file, + self.job_file + )) + + def _encode_cmd(self, cmd): + """Encode base64 cmd.""" + encoded_cmd = \ + base64.b64encode(cmd.encode('utf-8')).decode('utf-8') + return 'echo {}|base64 -d|bash'.format(encoded_cmd) + + def _wrap_singularity_cmd(self): + """Wrap command in singulrity.""" + base_singularity_cmd = \ + 'singularity exec -B {SLURM_WORKSAPCE}:{REANA_WORKSPACE}' \ + ' docker://{DOCKER_IMG} {CMD}'.format( + SLURM_WORKSAPCE=SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH, + REANA_WORKSPACE=SlurmJobManagerCERN.REANA_WORKSPACE_PATH, + DOCKER_IMG=self.docker_img, + CMD='./' + self.job_file + ) + return base_singularity_cmd + + def get_outputs(): + """Transfer job outputs to REANA.""" + os.chdir(SlurmJobManagerCERN.REANA_WORKSPACE_PATH) + slurm_connection = SSHClient() + sftp = slurm_connection.ssh_client.open_sftp() + SlurmJobManagerCERN._download_dir( + sftp, + SlurmJobManagerCERN.SLURM_WORKSAPCE_PATH, + SlurmJobManagerCERN.REANA_WORKSPACE_PATH) + sftp.close() + + def _download_dir(sftp, remote_dir, local_dir): + """Download remote directory content.""" + os.path.exists(local_dir) or os.makedirs(local_dir) + dir_items = sftp.listdir_attr(remote_dir) + for item in dir_items: + remote_path = os.path.join(remote_dir, item.filename) + local_path = os.path.join(local_dir, item.filename) + if S_ISDIR(item.st_mode): + SlurmJobManagerCERN._download_dir( + sftp, remote_path, local_path) + else: + sftp.get(remote_path, local_path) + + def get_logs(backend_job_id, workspace): + """Return job logs if log files are present.""" + stderr_file = \ + os.path.join(workspace, + 'reana_job.' + str(backend_job_id) + '.err') + stdout_file = \ + os.path.join(workspace, + 'reana_job.' + str(backend_job_id) + '.out') + log_files = [stderr_file, stdout_file] + job_log = '' + try: + for file in log_files: + with open(file, "r") as log_file: + job_log += log_file.read() + return job_log + except Exception as e: + msg = 'Job logs of {} were not found. {}'.format(backend_job_id, e) + logging.error(msg, exc_info=True) + return msg diff --git a/reana_job_controller/utils.py b/reana_job_controller/utils.py index ac037607..303681d1 100644 --- a/reana_job_controller/utils.py +++ b/reana_job_controller/utils.py @@ -9,6 +9,8 @@ """Job controller utils.""" import logging +import os +import subprocess import paramiko from reana_db.database import Session @@ -19,9 +21,9 @@ def singleton(cls): """Singelton decorator.""" instances = {} - def getinstance(*args, **kwargs): + def getinstance(**kwargs): if cls not in instances: - instances[cls] = cls(*args, **kwargs) + instances[cls] = cls(**kwargs) return instances[cls] return getinstance @@ -39,17 +41,35 @@ def update_workflow_logs(workflow_uuid, log_message): exc_info=True) +def initialize_krb5_token(): + """Create kerberos ticket from mounted keytab_file.""" + cern_user = os.environ.get('CERN_USER') + keytab_file = os.environ.get('HTCONDORCERN_KEYTAB') + cmd = \ + 'kinit -kt /etc/reana/secrets/{} {}@CERN.CH'.format(keytab_file, + cern_user) + if cern_user: + try: + subprocess.check_output(cmd, shell=True) + except subprocess.CalledProcessError as err: + logging.error("Authentication failed: {}".format(err), + exc_info=True) + else: + msg = 'CERN_USER is not set.' + logging.error(msg, exc_info=True) + + @singleton class SSHClient(): """SSH Client.""" - def __init__(self, hostname, username, password, port): + def __init__(self, hostname=None, username=None, port=None): """Initialize ssh client.""" self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy( paramiko.AutoAddPolicy()) self.ssh_client.connect( - hostname, - username, - password, - port) + hostname=hostname, + username=username, + port=port, + gss_auth=True) diff --git a/setup.py b/setup.py index 70b71285..b5bd44e1 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ 'reana-db>=0.6.0.dev20190828,<0.7.0', 'htcondor==8.9.2', 'retrying>=1.3.3', - 'paramiko>=2.6.0' + 'paramiko[gssapi]>=2.6.0', ] packages = find_packages() diff --git a/tests/test_job_monitor.py b/tests/test_job_monitor.py index 1e38531e..46a9e83f 100644 --- a/tests/test_job_monitor.py +++ b/tests/test_job_monitor.py @@ -19,9 +19,13 @@ def test_if_singelton(app): """Test if job monitor classes are singelton.""" with mock.patch("reana_job_controller.job_monitor." "threading") as threading: - first_k8s_instance = JobMonitorKubernetes(app) - second_k8s_instance = JobMonitorKubernetes(app) + first_k8s_instance = JobMonitorKubernetes(thread_name='k8s_monitor', + app=app) + second_k8s_instance = JobMonitorKubernetes(thread_name='k8s_monitor', + app=app) assert first_k8s_instance is second_k8s_instance - first_htc_instance = JobMonitorHTCondorCERN(app) - second_htc_instance = JobMonitorHTCondorCERN(app) + first_htc_instance = JobMonitorHTCondorCERN(thread_name='k8s_monitor', + app=app) + second_htc_instance = JobMonitorHTCondorCERN(thread_name='k8s_monitor', + app=app) assert first_htc_instance is second_htc_instance