Skip to content

Commit

Permalink
K8S: porting Kubernetes backend to use JobManager class
Browse files Browse the repository at this point in the history
* Closes reanahub#118
  • Loading branch information
Rokas Maciulaitis authored and roksys committed Apr 1, 2019
1 parent 20940c5 commit 2e33a76
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 190 deletions.
3 changes: 3 additions & 0 deletions reana_job_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@

SHARED_VOLUME_PATH_ROOT = os.getenv('SHARED_VOLUME_PATH_ROOT', '/var/reana')
"""Root path of the shared volume ."""

JOB_BACKENDS = ['Kubernetes']
"""Supported job backends."""
25 changes: 12 additions & 13 deletions reana_job_controller/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import shlex

from reana_commons.utils import calculate_file_access_time

from reana_db.database import Session
from reana_db.models import JobStatus, Workflow, JobCache
from reana_db.models import Job as JobTable
from reana_db.models import JobCache, JobStatus, Workflow

from reana_job_controller.config import MAX_JOB_RESTARTS

Expand Down Expand Up @@ -48,14 +47,13 @@ def __init__(self, docker_img='', cmd=[], env_vars={}, job_id=None,
self.workflow_uuid = workflow_uuid

def execution_hook(fn):
"""Add before and after execution hooks."""
"""Add before execution hooks and DB operations."""
def wrapper(inst, *args, **kwargs):
inst.before_execution()
result = fn(inst, *args, **kwargs)
inst.after_execution()
inst.create_job_in_db()
backend_job_id = fn(inst, *args, **kwargs)
inst.create_job_in_db(backend_job_id)
inst.cache_job()
return result
return backend_job_id
return wrapper

def before_execution(self):
Expand Down Expand Up @@ -95,25 +93,26 @@ def stop(self):
"""Stop a job."""
raise NotImplementedError

def create_job_in_db(self):
def create_job_in_db(self, backend_job_id):
"""Create job in db."""
job_db_entry = JobTable(
id_=self.job_id,
backend_job_id=backend_job_id,
workflow_uuid=self.workflow_uuid,
status=JobStatus.created,
status=JobStatus.created.name,
backend=self.backend,
# cvmfs_mounts=TODO,
# shared_file_system=TODO,
cvmfs_mounts=self.cvmfs_mounts or '',
shared_file_system=self.shared_file_system or False,
docker_img=self.docker_img,
cmd=self.cmd,
env_vars=json.dump(self.env_vars),
env_vars=json.dumps(self.env_vars),
restart_count=0,
max_restart_count=MAX_JOB_RESTARTS,
deleted=False,
name=self.job_id,
prettified_cmd=self.cmd)
Session.add(job_db_entry)
Session.commit()
self.job_id = str(job_db_entry.id_)

def cache_job(self):
"""Cache a job."""
Expand Down
166 changes: 32 additions & 134 deletions reana_job_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,134 +8,23 @@

"""Kubernetes wrapper."""

import ast
import logging
import os
import threading
import traceback

from flask import current_app as app
from kubernetes import client, watch
from kubernetes.client.models.v1_delete_options import V1DeleteOptions
from kubernetes.client.rest import ApiException
from reana_commons.config import CVMFS_REPOSITORIES
from reana_commons.k8s.api_client import (current_k8s_batchv1_api_client,
current_k8s_corev1_api_client)
from reana_commons.k8s.volumes import get_k8s_cvmfs_volume, get_shared_volume
from reana_db.database import Session
from reana_db.models import Job

from reana_job_controller import config
from reana_job_controller.config import SHARED_VOLUME_PATH_ROOT
from reana_job_controller.errors import ComputingBackendSubmissionError


def add_shared_volume(job, workflow_workspace):
"""Add shared CephFS volume to a given job spec.
:param job: Kubernetes job spec.
:param workflow_workspace: Absolute path to the job's workflow workspace.
"""
volume_mount, volume = get_shared_volume(workflow_workspace,
SHARED_VOLUME_PATH_ROOT)
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount
)
job['spec']['template']['spec']['volumes'].append(volume)


def k8s_instantiate_job(job_id, workflow_workspace, docker_img, cmd,
cvmfs_mounts, env_vars, shared_file_system, job_type,
namespace='default'):
"""Create Kubernetes job.
:param job_id: Job uuid.
:param workflow_workspace: Absolute path to the job's workflow workspace.
:param docker_img: Docker image to run the job.
:param cmd: Command provided to the docker container.
:param cvmfs_mounts: List of CVMFS volumes to mount in job pod.
:param env_vars: Dictionary representing environment variables
as {'var_name': 'var_value'}.
:param namespace: Job's namespace.
:shared_file_system: Boolean which represents whether the job
should have a shared file system mounted.
:returns: A :class:`kubernetes.client.models.v1_job.V1Job` corresponding
to the created job, None if the creation could not take place.
"""
job = {
'kind': 'Job',
'apiVersion': 'batch/v1',
'metadata': {
'name': job_id,
'namespace': namespace
},
'spec': {
'backoffLimit': app.config['MAX_JOB_RESTARTS'],
'autoSelector': True,
'template': {
'metadata': {
'name': job_id
},
'spec': {
'containers': [
{
'name': job_id,
'image': docker_img,
'env': [],
'volumeMounts': []
},
],
'volumes': [],
'restartPolicy': 'Never'
}
}
}
}

if cmd:
import shlex
(job['spec']['template']['spec']['containers']
[0]['command']) = shlex.split(cmd)

if env_vars:
for var, value in env_vars.items():
job['spec']['template']['spec']['containers'][0]['env'].append(
{'name': var, 'value': value}
)

if shared_file_system:
add_shared_volume(job, workflow_workspace)

if cvmfs_mounts != 'false':
cvmfs_map = {}
for cvmfs_mount_path in ast.literal_eval(cvmfs_mounts):
if cvmfs_mount_path in CVMFS_REPOSITORIES:
cvmfs_map[
CVMFS_REPOSITORIES[cvmfs_mount_path]] = cvmfs_mount_path

for repository, mount_path in cvmfs_map.items():
volume = get_k8s_cvmfs_volume(repository)

(job['spec']['template']['spec']['containers'][0]
['volumeMounts'].append(
{'name': volume['name'],
'mountPath': '/cvmfs/{}'.format(mount_path)}
))
job['spec']['template']['spec']['volumes'].append(volume)

# add better handling
try:
api_response = \
current_k8s_batchv1_api_client.create_namespaced_job(
namespace=namespace, body=job)
return api_response
except client.rest.ApiException as e:
logging.debug("Error while connecting to Kubernetes API: {}".format(e))
except Exception as e:
logging.error(traceback.format_exc())
logging.debug("Unexpected error: {}".format(e))


def k8s_watch_jobs(job_db):
"""Open stream connection to k8s apiserver to watch all jobs status.
Expand All @@ -154,47 +43,54 @@ def k8s_watch_jobs(job_db):
job = event['object']

# Taking note of the remaining jobs since deletion might not
# happend straight away.
remaining_jobs = [j for j in job_db.keys()
if not job_db[j]['deleted']]
if (not job_db.get(job.metadata.name) or
# happen straight away.
remaining_jobs = dict()
for job_id, job_dict in job_db.items():
if not job_db[job_id]['deleted']:
remaining_jobs[job_dict['backend_job_id']] = job_id
if (not job_db.get(remaining_jobs.get(job.metadata.name)) or
job.metadata.name not in remaining_jobs):
# Ignore jobs not created by this specific instance
# or already deleted jobs.
continue
elif job.status.succeeded:
job_id = remaining_jobs[job.metadata.name]
kubernetes_job_id = job.metadata.name
if job.status.succeeded:
logging.info(
'Job {} succeeded.'.format(
job.metadata.name)
'Job job_id: {}, kubernetes_job_id: {}'
' succeeded.'.format(job_id, kubernetes_job_id)
)
job_db[job.metadata.name]['status'] = 'succeeded'
job_db[job_id]['status'] = 'succeeded'
elif (job.status.failed and
job.status.failed >= config.MAX_JOB_RESTARTS):
logging.info('Job {} failed.'.format(
job.metadata.name))
job_db[job.metadata.name]['status'] = 'failed'
logging.info(
'Job job_id: {}, kubernetes_job_id: {} failed.'.format(
job_id,
kubernetes_job_id)
)
job_db[job_id]['status'] = 'failed'
else:
continue
# Grab logs when job either succeeds or fails.
logging.info('Getting last spawned pod for job {}'.format(
job.metadata.name))
logging.info('Getting last spawned pod for kubernetes'
' job {}'.format(kubernetes_job_id))
last_spawned_pod = \
current_k8s_corev1_api_client.list_namespaced_pod(
job.metadata.namespace,
namespace=job.metadata.namespace,
label_selector='job-name={job_name}'.format(
job_name=job.metadata.name)).items[-1]
job_name=kubernetes_job_id)).items[-1]
logging.info('Grabbing pod {} logs...'.format(
last_spawned_pod.metadata.name))
job_db[job.metadata.name]['log'] = \
job_db[job_id]['log'] = \
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=last_spawned_pod.metadata.namespace,
name=last_spawned_pod.metadata.name)
# Store job logs
try:
logging.info('Storing job logs: {}'.
format(job_db[job.metadata.name]['log']))
Session.query(Job).filter_by(id_=job.metadata.name). \
update(dict(logs=job_db[job.metadata.name]['log']))
format(job_db[job_id]['log']))
Session.query(Job).filter_by(id_=job_id). \
update(dict(logs=job_db[job_id]['log']))
Session.commit()

except Exception as e:
Expand All @@ -203,10 +99,10 @@ def k8s_watch_jobs(job_db):
format(last_spawned_pod))
logging.debug('Exception: {}'.format(str(e)))

logging.info('Cleaning job {} ...'.format(
job.metadata.name))
logging.info('Cleaning Kubernetes job {} ...'.format(
kubernetes_job_id))
k8s_delete_job(job)
job_db[job.metadata.name]['deleted'] = True
job_db[job_id]['deleted'] = True
except client.rest.ApiException as e:
logging.debug(
"Error while connecting to Kubernetes API: {}".format(e))
Expand All @@ -228,7 +124,9 @@ def k8s_delete_job(job, asynchronous=True):
delete_options = V1DeleteOptions(
propagation_policy=propagation_policy)
current_k8s_batchv1_api_client.delete_namespaced_job(
job.metadata.name, job.metadata.namespace, delete_options)
name=job.metadata.name,
namespace=job.metadata.namespace,
body=delete_options)
except ApiException as e:
logging.error(
'An error has occurred while connecting to Kubernetes API Server'
Expand Down
Loading

0 comments on commit 2e33a76

Please sign in to comment.