diff --git a/docs/applications/development/workflows-api.md b/docs/applications/development/workflows-api.md index f82ea6da..27c455b4 100644 --- a/docs/applications/development/workflows-api.md +++ b/docs/applications/development/workflows-api.md @@ -114,7 +114,15 @@ The shared volume must be indicated both in the Operation and it is propagated t The `shared_directory` parameter is a quick way to specify a shared directory, and, optionally, the name of a volume claim. -The syntax is `[CLAIM_NAME:]MOUNT_PATH`. +The syntax is `[CLAIM_NAME:]MOUNT_PATH[:MODE]`. +- The `CLAIM_NAME` can be an existing or new volume claim name. In the case a claim already exists with that name it will be used. +Otherwise a new ephemeral volume is created: that volume will exist during the life of the workflow and deleted after completion +- The `MOUNT_PATH` is the path where we want the volume to be mounted inside our pod +- The appendix `:MODE` indicated the read/write mode. If `ro`, the +volume is mounted as read-only. Read only volumes are useful to overcome +scheduling limitations (ReadWriteOnce is usually available) when +writing is not required, and it's generally recommended whenever writing +is not required. ```Python shared_directory="myclaim:/opt/shared" @@ -126,12 +134,14 @@ op.execute() More than one directory/volume can be shared by passing a list/tuple: ```Python -shared_directory=["myclaim:/opt/shared", "myclaim2:/opt/shared2"] +shared_directory=["myclaim:/opt/shared:ro", "myclaim2:/opt/shared2"] my_task = tasks.CustomTask('print-file', 'myapp-mytask') op = operations.SingleTaskOperation('my-op-', my_task, shared_directory=shared_directory) op.execute() ``` + + ## Pod execution context / affinity Affinity is set through the @@ -144,10 +154,24 @@ op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f pod_context=operations.PodExecutionContext(key='a', value='b', required=True)) ``` + + The execution context is set allows to group pods in the same node (see [here](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity)). This is important in particular when pods are sharing node resources like ReadWriteOnce volumes within a parallel execution or with other deployments in the cluster. The execution context sets the affinity and the metadata attributes so that all pods with the same context -run in the same node +run in the same node. + + +Is is also possible to specify a tuple or list for multiple affinities like: + +```Python +op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)), + pod_context=( + operations.PodExecutionContext('a', 'b'), + operations.PodExecutionContext('c', 'd', required=True), + operations.PodExecutionContext('e', 'f') + )) +``` ## TTL (Time-To-Live) strategy diff --git a/libraries/cloudharness-common/cloudharness/workflows/operations.py b/libraries/cloudharness-common/cloudharness/workflows/operations.py index 23e76816..0dcb2632 100644 --- a/libraries/cloudharness-common/cloudharness/workflows/operations.py +++ b/libraries/cloudharness-common/cloudharness/workflows/operations.py @@ -1,8 +1,9 @@ import time import pyaml +from typing import Union from collections.abc import Iterable - +from kubernetes.client.models.v1_affinity import V1Affinity from cloudharness_cli.workflows.models.operation_status import OperationStatus from cloudharness.events.client import EventClient @@ -60,14 +61,20 @@ class ContainerizedOperation(ManagedOperation): Abstract Containarized operation based on an argo workflow """ - def __init__(self, basename: str, pod_context: PodExecutionContext = None, shared_directory=None, *args, **kwargs): + def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None, shared_directory=None, *args, **kwargs): """ :param basename: :param pod_context: PodExecutionContext - represents affinity with other pods in the system :param shared_directory: bool|str|list """ super(ContainerizedOperation, self).__init__(basename, *args, **kwargs) - self.pod_context = pod_context + if type(pod_context) == PodExecutionContext: + self.pod_contexts = [pod_context] + elif pod_context is None: + self.pod_contexts = [] + else: + self.pod_contexts = list(pod_context) + self.persisted = None shared_path = None if shared_directory: @@ -87,17 +94,19 @@ def __init__(self, basename: str, pod_context: PodExecutionContext = None, share task.add_env('shared_directory', shared_path) else: self.volumes = tuple() + + self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if ':' in v] def task_list(self): raise NotImplementedError() @property def entrypoint(self): - raise NotImplemented + raise NotImplementedError() @property def templates(self): - raise NotImplemented + raise NotImplementedError() def to_workflow(self, **arguments): return { @@ -125,7 +134,7 @@ def spec(self): if self.on_exit_notify: spec = self.add_on_exit_notify_handler(spec) - if self.pod_context: + if self.pod_contexts: spec['affinity'] = self.affinity_spec() if self.volumes: spec['volumeClaimTemplates'] = [self.spec_volumeclaim(volume) for volume in self.volumes if @@ -134,40 +143,46 @@ def spec(self): ':' in volume] # with PVC prefix (e.g. pvc-001:/location) return spec + + def affinity_spec(self): + contexts=self.pod_contexts + PREFERRED = 'preferredDuringSchedulingIgnoredDuringExecution' + REQUIRED = 'requiredDuringSchedulingIgnoredDuringExecution' + + pod_affinity = { + PREFERRED: [], + REQUIRED: [] + } + - term = { - 'labelSelector': - { - 'matchExpressions': [ - { - 'key': self.pod_context.key, - 'operator': 'In', - 'values': [self.pod_context.value] - }, - ] - }, - 'topologyKey': 'kubernetes.io/hostname' - } - if not self.pod_context.required: - return { - 'podAffinity': + for context in contexts: + term= { + 'labelSelector': { - 'preferredDuringSchedulingIgnoredDuringExecution': [ + 'matchExpressions': [ { - 'weight': 100, - 'podAffinityTerm': term - - }] - } - } - else: - return { - 'podAffinity': - { - 'requiredDuringSchedulingIgnoredDuringExecution': [term] - } + 'key': context.key, + 'operator': 'In', + 'values': [context.value] + }, + ] + }, + 'topologyKey': 'kubernetes.io/hostname' } + if not context.required: + pod_affinity[PREFERRED].append( + { + 'weight': 100, + 'podAffinityTerm': term + + }) + else: + pod_affinity[REQUIRED].append(term) + + return { + 'podAffinity': pod_affinity + } def add_on_exit_notify_handler(self, spec): queue = self.on_exit_notify['queue'] @@ -187,18 +202,17 @@ def add_on_exit_notify_handler(self, spec): def modify_template(self, template): """Hook to modify templates (e.g. add volumes)""" - if self.pod_context: - if 'metadata' not in template: - template['metadata'] = {} - if 'labels' not in template['metadata']: - template['metadata']['labels'] = {} - template['metadata']['labels'][self.pod_context.key] = self.pod_context.value + + template["metadata"] = {"labels": {c.key:c.value for c in self.pod_contexts}} + if self.volumes: if 'container' in template: - template['container']['volumeMounts'] += [self.volume_template(volume) for volume in self.volumes] + template['container']['volumeMounts'] += [ + self.volume_template(volume) for volume in self.volumes] elif 'script' in template: - template['script']['volumeMounts'] += [self.volume_template(volume) for volume in self.volumes] - + template['script']['volumeMounts'] += [ + self.volume_template(volume) for volume in self.volumes] + return template def submit(self): @@ -231,9 +245,14 @@ def name_from_path(self, path): def volume_template(self, volume): path = volume - if ":" in path: - path = volume.split(':')[-1] - return dict({'name': self.name_from_path(path), 'mountPath': path}) + splitted = volume.split(':') + if len(splitted) > 1: + path = splitted[1] + return dict({ + 'name': self.name_from_path(path), + 'mountPath': path, + 'readonly': False if len(splitted) < 3 else splitted[2] == "ro" + }) def spec_volumeclaim(self, volume): # when the volume is NOT prefixed by a PVC (e.g. /location) then create a temporary PVC for the workflow @@ -257,12 +276,13 @@ def spec_volumeclaim(self, volume): def spec_volume(self, volume): # when the volume is prefixed by a PVC (e.g. pvc-001:/location) then add the PVC to the volumes of the workflow if ':' in volume: - pvc, path = volume.split(':') + pvc, path, *c = volume.split(':') return { 'name': self.name_from_path(path), 'persistentVolumeClaim': { 'claimName': pvc - } + }, + } return {} @@ -378,7 +398,7 @@ def __init__(self, basename, tasks, shared_directory="", shared_volume_size=10, self.entrypoint_template = {'name': self.entrypoint, 'steps': self.steps_spec()} def steps_spec(self): - raise NotImplemented + raise NotImplementedError() def task_list(self): return self.tasks diff --git a/libraries/cloudharness-common/tests/test_workflow.py b/libraries/cloudharness-common/tests/test_workflow.py index 4d4d4f04..c9183a16 100644 --- a/libraries/cloudharness-common/tests/test_workflow.py +++ b/libraries/cloudharness-common/tests/test_workflow.py @@ -112,7 +112,7 @@ def test_single_task_shared(): def test_single_task_shared_multiple(): - shared_directory = ['myclaim:/mnt/shared', 'myclaim2:/mnt/shared2'] + shared_directory = ['myclaim:/mnt/shared', 'myclaim2:/mnt/shared2:ro'] task_write = operations.CustomTask('download-file', 'workflows-extract-download', url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md') op = operations.SingleTaskOperation('test-custom-connected-op-', task_write, @@ -124,6 +124,16 @@ def test_single_task_shared_multiple(): assert len(wf['spec']['volumes']) == 3 assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim' assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 3 + + assert wf['spec']['templates'][0]['container']['volumeMounts'][2]['readonly'] == True + + assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume'] + + assert 'affinity' in wf['spec'] + assert len(wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution']) == 2, "A pod affinity for each volume is expected" + affinity_expr = wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector']['matchExpressions'][0] + assert affinity_expr['key'] == 'usesvolume' + assert affinity_expr['values'][0] == 'myclaim' if execute: print(op.execute()) @@ -139,6 +149,7 @@ def test_single_task_shared_script(): assert len(wf['spec']['volumes']) == 2 assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim' assert len(wf['spec']['templates'][0]['script']['volumeMounts']) == 2 + if execute: print(op.execute()) @@ -230,3 +241,38 @@ def f(): for task in workflow['spec']['templates']: assert task['metadata']['labels']['a'] == 'b' + + + op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)), + pod_context=( + operations.PodExecutionContext('a', 'b'), + operations.PodExecutionContext('c', 'd', required=True), + operations.PodExecutionContext('e', 'f') + )) + workflow = op.to_workflow() + assert 'affinity' in workflow['spec'] + preferred = workflow['spec']['affinity']['podAffinity']['preferredDuringSchedulingIgnoredDuringExecution'] + assert len(preferred) == 2 + affinity_expr = preferred[0]['podAffinityTerm']['labelSelector']['matchExpressions'][0] + + assert affinity_expr['key'] == 'a' + assert affinity_expr['values'][0] == 'b' + + for task in workflow['spec']['templates']: + assert task['metadata']['labels']['a'] == 'b' + + affinity_expr = preferred[1][ + 'podAffinityTerm']['labelSelector']['matchExpressions'][0] + + assert affinity_expr['key'] == 'e' + assert affinity_expr['values'][0] == 'f' + + for task in workflow['spec']['templates']: + assert task['metadata']['labels']['e'] == 'f' + + affinity_expr = \ + workflow['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0][ + 'labelSelector'][ + 'matchExpressions'][0] + assert affinity_expr['key'] == 'c' + assert affinity_expr['values'][0] == 'd' \ No newline at end of file