Skip to content

Commit

Permalink
Merge pull request #507 from MetaCell/feature/506
Browse files Browse the repository at this point in the history
Workflows and volumes enhancements
  • Loading branch information
zsinnema authored Jul 12, 2022
2 parents 3d499a5 + 3ce0e35 commit e3e84c3
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 54 deletions.
30 changes: 27 additions & 3 deletions docs/applications/development/workflows-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,15 @@ The shared volume must be indicated both in the Operation and it is propagated t
The `shared_directory` parameter is a quick way to specify a shared directory, and, optionally,
the name of a volume claim.

The syntax is `[CLAIM_NAME:]MOUNT_PATH`.
The syntax is `[CLAIM_NAME:]MOUNT_PATH[:MODE]`.
- The `CLAIM_NAME` can be an existing or new volume claim name. In the case a claim already exists with that name it will be used.
Otherwise a new ephemeral volume is created: that volume will exist during the life of the workflow and deleted after completion
- The `MOUNT_PATH` is the path where we want the volume to be mounted inside our pod
- The appendix `:MODE` indicated the read/write mode. If `ro`, the
volume is mounted as read-only. Read only volumes are useful to overcome
scheduling limitations (ReadWriteOnce is usually available) when
writing is not required, and it's generally recommended whenever writing
is not required.

```Python
shared_directory="myclaim:/opt/shared"
Expand All @@ -126,12 +134,14 @@ op.execute()
More than one directory/volume can be shared by passing a list/tuple:

```Python
shared_directory=["myclaim:/opt/shared", "myclaim2:/opt/shared2"]
shared_directory=["myclaim:/opt/shared:ro", "myclaim2:/opt/shared2"]
my_task = tasks.CustomTask('print-file', 'myapp-mytask')
op = operations.SingleTaskOperation('my-op-', my_task, shared_directory=shared_directory)
op.execute()
```



## Pod execution context / affinity

Affinity is set through the
Expand All @@ -144,10 +154,24 @@ op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f
pod_context=operations.PodExecutionContext(key='a', value='b', required=True))
```



The execution context is set allows to group pods in the same node (see [here](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity)).
This is important in particular when pods are sharing node resources like ReadWriteOnce volumes within a parallel execution or with other deployments in the cluster.
The execution context sets the affinity and the metadata attributes so that all pods with the same context
run in the same node
run in the same node.


Is is also possible to specify a tuple or list for multiple affinities like:

```Python
op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)),
pod_context=(
operations.PodExecutionContext('a', 'b'),
operations.PodExecutionContext('c', 'd', required=True),
operations.PodExecutionContext('e', 'f')
))
```

## TTL (Time-To-Live) strategy

Expand Down
120 changes: 70 additions & 50 deletions libraries/cloudharness-common/cloudharness/workflows/operations.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import time
import pyaml
from typing import Union

from collections.abc import Iterable

from kubernetes.client.models.v1_affinity import V1Affinity
from cloudharness_cli.workflows.models.operation_status import OperationStatus

from cloudharness.events.client import EventClient
Expand Down Expand Up @@ -60,14 +61,20 @@ class ContainerizedOperation(ManagedOperation):
Abstract Containarized operation based on an argo workflow
"""

def __init__(self, basename: str, pod_context: PodExecutionContext = None, shared_directory=None, *args, **kwargs):
def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None, shared_directory=None, *args, **kwargs):
"""
:param basename:
:param pod_context: PodExecutionContext - represents affinity with other pods in the system
:param shared_directory: bool|str|list
"""
super(ContainerizedOperation, self).__init__(basename, *args, **kwargs)
self.pod_context = pod_context
if type(pod_context) == PodExecutionContext:
self.pod_contexts = [pod_context]
elif pod_context is None:
self.pod_contexts = []
else:
self.pod_contexts = list(pod_context)

self.persisted = None
shared_path = None
if shared_directory:
Expand All @@ -87,17 +94,19 @@ def __init__(self, basename: str, pod_context: PodExecutionContext = None, share
task.add_env('shared_directory', shared_path)
else:
self.volumes = tuple()

self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if ':' in v]

def task_list(self):
raise NotImplementedError()

@property
def entrypoint(self):
raise NotImplemented
raise NotImplementedError()

@property
def templates(self):
raise NotImplemented
raise NotImplementedError()

def to_workflow(self, **arguments):
return {
Expand Down Expand Up @@ -125,7 +134,7 @@ def spec(self):
if self.on_exit_notify:
spec = self.add_on_exit_notify_handler(spec)

if self.pod_context:
if self.pod_contexts:
spec['affinity'] = self.affinity_spec()
if self.volumes:
spec['volumeClaimTemplates'] = [self.spec_volumeclaim(volume) for volume in self.volumes if
Expand All @@ -134,40 +143,46 @@ def spec(self):
':' in volume] # with PVC prefix (e.g. pvc-001:/location)
return spec



def affinity_spec(self):
contexts=self.pod_contexts
PREFERRED = 'preferredDuringSchedulingIgnoredDuringExecution'
REQUIRED = 'requiredDuringSchedulingIgnoredDuringExecution'

pod_affinity = {
PREFERRED: [],
REQUIRED: []
}


term = {
'labelSelector':
{
'matchExpressions': [
{
'key': self.pod_context.key,
'operator': 'In',
'values': [self.pod_context.value]
},
]
},
'topologyKey': 'kubernetes.io/hostname'
}
if not self.pod_context.required:
return {
'podAffinity':
for context in contexts:
term= {
'labelSelector':
{
'preferredDuringSchedulingIgnoredDuringExecution': [
'matchExpressions': [
{
'weight': 100,
'podAffinityTerm': term

}]
}
}
else:
return {
'podAffinity':
{
'requiredDuringSchedulingIgnoredDuringExecution': [term]
}
'key': context.key,
'operator': 'In',
'values': [context.value]
},
]
},
'topologyKey': 'kubernetes.io/hostname'
}
if not context.required:
pod_affinity[PREFERRED].append(
{
'weight': 100,
'podAffinityTerm': term

})
else:
pod_affinity[REQUIRED].append(term)

return {
'podAffinity': pod_affinity
}

def add_on_exit_notify_handler(self, spec):
queue = self.on_exit_notify['queue']
Expand All @@ -187,18 +202,17 @@ def add_on_exit_notify_handler(self, spec):

def modify_template(self, template):
"""Hook to modify templates (e.g. add volumes)"""
if self.pod_context:
if 'metadata' not in template:
template['metadata'] = {}
if 'labels' not in template['metadata']:
template['metadata']['labels'] = {}
template['metadata']['labels'][self.pod_context.key] = self.pod_context.value

template["metadata"] = {"labels": {c.key:c.value for c in self.pod_contexts}}

if self.volumes:
if 'container' in template:
template['container']['volumeMounts'] += [self.volume_template(volume) for volume in self.volumes]
template['container']['volumeMounts'] += [
self.volume_template(volume) for volume in self.volumes]
elif 'script' in template:
template['script']['volumeMounts'] += [self.volume_template(volume) for volume in self.volumes]

template['script']['volumeMounts'] += [
self.volume_template(volume) for volume in self.volumes]

return template

def submit(self):
Expand Down Expand Up @@ -231,9 +245,14 @@ def name_from_path(self, path):

def volume_template(self, volume):
path = volume
if ":" in path:
path = volume.split(':')[-1]
return dict({'name': self.name_from_path(path), 'mountPath': path})
splitted = volume.split(':')
if len(splitted) > 1:
path = splitted[1]
return dict({
'name': self.name_from_path(path),
'mountPath': path,
'readonly': False if len(splitted) < 3 else splitted[2] == "ro"
})

def spec_volumeclaim(self, volume):
# when the volume is NOT prefixed by a PVC (e.g. /location) then create a temporary PVC for the workflow
Expand All @@ -257,12 +276,13 @@ def spec_volumeclaim(self, volume):
def spec_volume(self, volume):
# when the volume is prefixed by a PVC (e.g. pvc-001:/location) then add the PVC to the volumes of the workflow
if ':' in volume:
pvc, path = volume.split(':')
pvc, path, *c = volume.split(':')
return {
'name': self.name_from_path(path),
'persistentVolumeClaim': {
'claimName': pvc
}
},

}
return {}

Expand Down Expand Up @@ -378,7 +398,7 @@ def __init__(self, basename, tasks, shared_directory="", shared_volume_size=10,
self.entrypoint_template = {'name': self.entrypoint, 'steps': self.steps_spec()}

def steps_spec(self):
raise NotImplemented
raise NotImplementedError()

def task_list(self):
return self.tasks
Expand Down
48 changes: 47 additions & 1 deletion libraries/cloudharness-common/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_single_task_shared():


def test_single_task_shared_multiple():
shared_directory = ['myclaim:/mnt/shared', 'myclaim2:/mnt/shared2']
shared_directory = ['myclaim:/mnt/shared', 'myclaim2:/mnt/shared2:ro']
task_write = operations.CustomTask('download-file', 'workflows-extract-download',
url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md')
op = operations.SingleTaskOperation('test-custom-connected-op-', task_write,
Expand All @@ -124,6 +124,16 @@ def test_single_task_shared_multiple():
assert len(wf['spec']['volumes']) == 3
assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim'
assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 3

assert wf['spec']['templates'][0]['container']['volumeMounts'][2]['readonly'] == True

assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume']

assert 'affinity' in wf['spec']
assert len(wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution']) == 2, "A pod affinity for each volume is expected"
affinity_expr = wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector']['matchExpressions'][0]
assert affinity_expr['key'] == 'usesvolume'
assert affinity_expr['values'][0] == 'myclaim'
if execute:
print(op.execute())

Expand All @@ -139,6 +149,7 @@ def test_single_task_shared_script():
assert len(wf['spec']['volumes']) == 2
assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim'
assert len(wf['spec']['templates'][0]['script']['volumeMounts']) == 2

if execute:
print(op.execute())

Expand Down Expand Up @@ -230,3 +241,38 @@ def f():

for task in workflow['spec']['templates']:
assert task['metadata']['labels']['a'] == 'b'


op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)),
pod_context=(
operations.PodExecutionContext('a', 'b'),
operations.PodExecutionContext('c', 'd', required=True),
operations.PodExecutionContext('e', 'f')
))
workflow = op.to_workflow()
assert 'affinity' in workflow['spec']
preferred = workflow['spec']['affinity']['podAffinity']['preferredDuringSchedulingIgnoredDuringExecution']
assert len(preferred) == 2
affinity_expr = preferred[0]['podAffinityTerm']['labelSelector']['matchExpressions'][0]

assert affinity_expr['key'] == 'a'
assert affinity_expr['values'][0] == 'b'

for task in workflow['spec']['templates']:
assert task['metadata']['labels']['a'] == 'b'

affinity_expr = preferred[1][
'podAffinityTerm']['labelSelector']['matchExpressions'][0]

assert affinity_expr['key'] == 'e'
assert affinity_expr['values'][0] == 'f'

for task in workflow['spec']['templates']:
assert task['metadata']['labels']['e'] == 'f'

affinity_expr = \
workflow['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0][
'labelSelector'][
'matchExpressions'][0]
assert affinity_expr['key'] == 'c'
assert affinity_expr['values'][0] == 'd'

0 comments on commit e3e84c3

Please sign in to comment.