Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflows and volumes enhancements #507

Merged
merged 7 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions docs/applications/development/workflows-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,15 @@ The shared volume must be indicated both in the Operation and it is propagated t
The `shared_directory` parameter is a quick way to specify a shared directory, and, optionally,
the name of a volume claim.

The syntax is `[CLAIM_NAME:]MOUNT_PATH`.
The syntax is `[CLAIM_NAME:]MOUNT_PATH[:MODE]`.
- The `CLAIM_NAME` can be an existing or new volume claim name. In the case a claim already exists with that name it will be used.
Otherwise a new ephemeral volume is created: that volume will exist during the life of the workflow and deleted after completion
- The `MOUNT_PATH` is the path where we want the volume to be mounted inside our pod
- The appendix `:MODE` indicated the read/write mode. If `ro`, the
volume is mounted as read-only. Read only volumes are useful to overcome
scheduling limitations (ReadWriteOnce is usually available) when
writing is not required, and it's generally recommended whenever writing
is not required.

```Python
shared_directory="myclaim:/opt/shared"
Expand All @@ -126,12 +134,14 @@ op.execute()
More than one directory/volume can be shared by passing a list/tuple:

```Python
shared_directory=["myclaim:/opt/shared", "myclaim2:/opt/shared2"]
shared_directory=["myclaim:/opt/shared:ro", "myclaim2:/opt/shared2"]
my_task = tasks.CustomTask('print-file', 'myapp-mytask')
op = operations.SingleTaskOperation('my-op-', my_task, shared_directory=shared_directory)
op.execute()
```



## Pod execution context / affinity

Affinity is set through the
Expand All @@ -144,10 +154,24 @@ op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f
pod_context=operations.PodExecutionContext(key='a', value='b', required=True))
```



The execution context is set allows to group pods in the same node (see [here](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity)).
This is important in particular when pods are sharing node resources like ReadWriteOnce volumes within a parallel execution or with other deployments in the cluster.
The execution context sets the affinity and the metadata attributes so that all pods with the same context
run in the same node
run in the same node.


Is is also possible to specify a tuple or list for multiple affinities like:

```Python
op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)),
pod_context=(
operations.PodExecutionContext('a', 'b'),
operations.PodExecutionContext('c', 'd', required=True),
operations.PodExecutionContext('e', 'f')
))
```

## TTL (Time-To-Live) strategy

Expand Down
120 changes: 70 additions & 50 deletions libraries/cloudharness-common/cloudharness/workflows/operations.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import time
import pyaml
from typing import Union

from collections.abc import Iterable

from kubernetes.client.models.v1_affinity import V1Affinity
from cloudharness_cli.workflows.models.operation_status import OperationStatus

from cloudharness.events.client import EventClient
Expand Down Expand Up @@ -60,14 +61,20 @@ class ContainerizedOperation(ManagedOperation):
Abstract Containarized operation based on an argo workflow
"""

def __init__(self, basename: str, pod_context: PodExecutionContext = None, shared_directory=None, *args, **kwargs):
def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None, shared_directory=None, *args, **kwargs):
"""
:param basename:
:param pod_context: PodExecutionContext - represents affinity with other pods in the system
:param shared_directory: bool|str|list
"""
super(ContainerizedOperation, self).__init__(basename, *args, **kwargs)
self.pod_context = pod_context
if type(pod_context) == PodExecutionContext:
self.pod_contexts = [pod_context]
elif pod_context is None:
self.pod_contexts = []
else:
self.pod_contexts = list(pod_context)

self.persisted = None
shared_path = None
if shared_directory:
Expand All @@ -87,17 +94,19 @@ def __init__(self, basename: str, pod_context: PodExecutionContext = None, share
task.add_env('shared_directory', shared_path)
else:
self.volumes = tuple()

self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if ':' in v]

def task_list(self):
raise NotImplementedError()

@property
def entrypoint(self):
raise NotImplemented
raise NotImplementedError()

@property
def templates(self):
raise NotImplemented
raise NotImplementedError()

def to_workflow(self, **arguments):
return {
Expand Down Expand Up @@ -125,7 +134,7 @@ def spec(self):
if self.on_exit_notify:
spec = self.add_on_exit_notify_handler(spec)

if self.pod_context:
if self.pod_contexts:
spec['affinity'] = self.affinity_spec()
if self.volumes:
spec['volumeClaimTemplates'] = [self.spec_volumeclaim(volume) for volume in self.volumes if
Expand All @@ -134,40 +143,46 @@ def spec(self):
':' in volume] # with PVC prefix (e.g. pvc-001:/location)
return spec



def affinity_spec(self):
contexts=self.pod_contexts
PREFERRED = 'preferredDuringSchedulingIgnoredDuringExecution'
REQUIRED = 'requiredDuringSchedulingIgnoredDuringExecution'

pod_affinity = {
PREFERRED: [],
REQUIRED: []
}


term = {
'labelSelector':
{
'matchExpressions': [
{
'key': self.pod_context.key,
'operator': 'In',
'values': [self.pod_context.value]
},
]
},
'topologyKey': 'kubernetes.io/hostname'
}
if not self.pod_context.required:
return {
'podAffinity':
for context in contexts:
term= {
'labelSelector':
{
'preferredDuringSchedulingIgnoredDuringExecution': [
'matchExpressions': [
{
'weight': 100,
'podAffinityTerm': term

}]
}
}
else:
return {
'podAffinity':
{
'requiredDuringSchedulingIgnoredDuringExecution': [term]
}
'key': context.key,
'operator': 'In',
'values': [context.value]
},
]
},
'topologyKey': 'kubernetes.io/hostname'
}
if not context.required:
pod_affinity[PREFERRED].append(
{
'weight': 100,
'podAffinityTerm': term

})
else:
pod_affinity[REQUIRED].append(term)

return {
'podAffinity': pod_affinity
}

def add_on_exit_notify_handler(self, spec):
queue = self.on_exit_notify['queue']
Expand All @@ -187,18 +202,17 @@ def add_on_exit_notify_handler(self, spec):

def modify_template(self, template):
"""Hook to modify templates (e.g. add volumes)"""
if self.pod_context:
if 'metadata' not in template:
template['metadata'] = {}
if 'labels' not in template['metadata']:
template['metadata']['labels'] = {}
template['metadata']['labels'][self.pod_context.key] = self.pod_context.value

template["metadata"] = {"labels": {c.key:c.value for c in self.pod_contexts}}

if self.volumes:
if 'container' in template:
template['container']['volumeMounts'] += [self.volume_template(volume) for volume in self.volumes]
template['container']['volumeMounts'] += [
self.volume_template(volume) for volume in self.volumes]
elif 'script' in template:
template['script']['volumeMounts'] += [self.volume_template(volume) for volume in self.volumes]

template['script']['volumeMounts'] += [
self.volume_template(volume) for volume in self.volumes]

return template

def submit(self):
Expand Down Expand Up @@ -231,9 +245,14 @@ def name_from_path(self, path):

def volume_template(self, volume):
path = volume
if ":" in path:
path = volume.split(':')[-1]
return dict({'name': self.name_from_path(path), 'mountPath': path})
splitted = volume.split(':')
if len(splitted) > 1:
path = splitted[1]
return dict({
'name': self.name_from_path(path),
'mountPath': path,
'readonly': False if len(splitted) < 3 else splitted[2] == "ro"
})

def spec_volumeclaim(self, volume):
# when the volume is NOT prefixed by a PVC (e.g. /location) then create a temporary PVC for the workflow
Expand All @@ -257,12 +276,13 @@ def spec_volumeclaim(self, volume):
def spec_volume(self, volume):
# when the volume is prefixed by a PVC (e.g. pvc-001:/location) then add the PVC to the volumes of the workflow
if ':' in volume:
pvc, path = volume.split(':')
pvc, path, *c = volume.split(':')
return {
'name': self.name_from_path(path),
'persistentVolumeClaim': {
'claimName': pvc
}
},

}
return {}

Expand Down Expand Up @@ -378,7 +398,7 @@ def __init__(self, basename, tasks, shared_directory="", shared_volume_size=10,
self.entrypoint_template = {'name': self.entrypoint, 'steps': self.steps_spec()}

def steps_spec(self):
raise NotImplemented
raise NotImplementedError()

def task_list(self):
return self.tasks
Expand Down
48 changes: 47 additions & 1 deletion libraries/cloudharness-common/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_single_task_shared():


def test_single_task_shared_multiple():
shared_directory = ['myclaim:/mnt/shared', 'myclaim2:/mnt/shared2']
shared_directory = ['myclaim:/mnt/shared', 'myclaim2:/mnt/shared2:ro']
task_write = operations.CustomTask('download-file', 'workflows-extract-download',
url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md')
op = operations.SingleTaskOperation('test-custom-connected-op-', task_write,
Expand All @@ -124,6 +124,16 @@ def test_single_task_shared_multiple():
assert len(wf['spec']['volumes']) == 3
assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim'
assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 3

assert wf['spec']['templates'][0]['container']['volumeMounts'][2]['readonly'] == True

assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume']

assert 'affinity' in wf['spec']
assert len(wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution']) == 2, "A pod affinity for each volume is expected"
affinity_expr = wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector']['matchExpressions'][0]
assert affinity_expr['key'] == 'usesvolume'
assert affinity_expr['values'][0] == 'myclaim'
if execute:
print(op.execute())

Expand All @@ -139,6 +149,7 @@ def test_single_task_shared_script():
assert len(wf['spec']['volumes']) == 2
assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim'
assert len(wf['spec']['templates'][0]['script']['volumeMounts']) == 2

if execute:
print(op.execute())

Expand Down Expand Up @@ -230,3 +241,38 @@ def f():

for task in workflow['spec']['templates']:
assert task['metadata']['labels']['a'] == 'b'


op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)),
pod_context=(
operations.PodExecutionContext('a', 'b'),
operations.PodExecutionContext('c', 'd', required=True),
operations.PodExecutionContext('e', 'f')
))
workflow = op.to_workflow()
assert 'affinity' in workflow['spec']
preferred = workflow['spec']['affinity']['podAffinity']['preferredDuringSchedulingIgnoredDuringExecution']
assert len(preferred) == 2
affinity_expr = preferred[0]['podAffinityTerm']['labelSelector']['matchExpressions'][0]

assert affinity_expr['key'] == 'a'
assert affinity_expr['values'][0] == 'b'

for task in workflow['spec']['templates']:
assert task['metadata']['labels']['a'] == 'b'

affinity_expr = preferred[1][
'podAffinityTerm']['labelSelector']['matchExpressions'][0]

assert affinity_expr['key'] == 'e'
assert affinity_expr['values'][0] == 'f'

for task in workflow['spec']['templates']:
assert task['metadata']['labels']['e'] == 'f'

affinity_expr = \
workflow['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0][
'labelSelector'][
'matchExpressions'][0]
assert affinity_expr['key'] == 'c'
assert affinity_expr['values'][0] == 'd'