Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/584 #585

Merged
merged 12 commits into from
Oct 11, 2022
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import connexion
import six

from cloudharness.auth import get_api_password
from samples.models.inline_response202 import InlineResponse202 # noqa: E501
from samples import util
from samples.models import InlineResponse202
Expand All @@ -12,7 +13,8 @@
try:
from cloudharness.workflows import operations, tasks
except Exception as e:
log.error("Cannot start workflows module. Probably this is related some problem with the kubectl configuration", exc_info=True)
log.error("Cannot start workflows module. Probably this is related some problem with the kubectl configuration",
exc_info=True)


def submit_async(): # noqa: E501
Expand All @@ -33,7 +35,8 @@ def submit_async(): # noqa: E501

submitted = op.execute()
if not op.is_error():
return InlineResponse202(task=InlineResponse202Task(href=op.get_operation_update_url(), name=submitted.name)), 202
return InlineResponse202(
task=InlineResponse202Task(href=op.get_operation_update_url(), name=submitted.name)), 202
else:
return 'Error submitting operation', 500

Expand All @@ -47,7 +50,8 @@ def submit_sync(): # noqa: E501
:rtype: str
"""
task = tasks.CustomTask(
'download-file', 'workflows-extract-download', url='https://github.com/MetaCell/cloud-harness/blob/master/README.md')
'download-file', 'workflows-extract-download',
url='https://github.com/MetaCell/cloud-harness/blob/master/README.md')

op = operations.DistributedSyncOperation('test-sync-op-', task)
try:
Expand All @@ -60,6 +64,22 @@ def submit_sync(): # noqa: E501
return 'Error submitting operation: %s' % e, 500


def submit_secret_sync_with_results(): # noqa: E501
"""Send a synchronous operation and get results using the event queue. Checks access to get_api_password

:rtype: str
"""
task = tasks.CustomTask('test-secret', 'samples-secret')
try:
op = operations.DistributedSyncOperationWithResults(
'test-sync-secret-op-results-', task)
result = op.execute()
assert result[0]["result"] != ''
return result[0]["result"]
except Exception as e:
return jsonify(str(e)), 500


def submit_sync_with_results(a=1, b=2): # noqa: E501
"""Send a synchronous operation and get results using the event queue. Just a sum, but in the cloud

Expand Down
6 changes: 6 additions & 0 deletions applications/samples/tasks/secret/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ARG CLOUDHARNESS_BASE
FROM $CLOUDHARNESS_BASE

ADD . /

CMD python main.py
13 changes: 13 additions & 0 deletions applications/samples/tasks/secret/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

from cloudharness.auth import get_api_password
from cloudharness.workflows.utils import get_shared_directory

for env in os.environ:
print(f"{env}:{os.environ[env]}")

file_name = os.path.join(get_shared_directory(), "result")
print("File name is", file_name)

with open(file_name, "w") as f:
f.write(get_api_password())
103 changes: 54 additions & 49 deletions libraries/cloudharness-common/cloudharness/workflows/operations.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import time
import pyaml
from collections.abc import Iterable
from typing import Union

from collections.abc import Iterable
from kubernetes.client.models.v1_affinity import V1Affinity
from cloudharness_cli.workflows.models.operation_status import OperationStatus
import pyaml

from cloudharness import log
from cloudharness.events.client import EventClient
from cloudharness.utils import env, config
from cloudharness import log

from cloudharness_cli.workflows.models.operation_status import OperationStatus
from . import argo
from .tasks import Task, SendResultTask, CustomTask
from .utils import is_accounts_present

POLLING_WAIT_SECONDS = 1
SERVICE_ACCOUNT = 'argo-workflows'
Expand Down Expand Up @@ -39,15 +38,15 @@ class ManagedOperation:
"""

def __init__(self,
name,
ttl_strategy: dict = {
'secondsAfterCompletion': 60 * 60,
'secondsAfterSuccess': 60 * 20,
'secondsAfterFailure': 60 * 120
},
*args,
on_exit_notify=None,
**kwargs):
name,
ttl_strategy: dict = {
'secondsAfterCompletion': 60 * 60,
'secondsAfterSuccess': 60 * 20,
'secondsAfterFailure': 60 * 120
},
*args,
on_exit_notify=None,
**kwargs):
self.name = name
self.ttl_strategy = ttl_strategy
self.on_exit_notify = on_exit_notify
Expand All @@ -56,12 +55,14 @@ def execute(self, **parameters):
raise NotImplementedError(f"{self.__class__.__name__} is abstract")



class ContainerizedOperation(ManagedOperation):
"""
Abstract Containarized operation based on an argo workflow
"""

def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None, shared_directory=None, *args, **kwargs):
def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None,
shared_directory=None, *args, **kwargs):
"""
:param basename:
:param pod_context: PodExecutionContext - represents affinity with other pods in the system
Expand All @@ -74,7 +75,7 @@ def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list,
self.pod_contexts = []
else:
self.pod_contexts = list(pod_context)

self.persisted = None
shared_path = None
if shared_directory:
Expand All @@ -87,15 +88,17 @@ def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list,
else:
self.volumes = shared_directory
assert len(set(shared_directory)) == len(shared_directory), "Shared directories are not unique"
assert len(set(s.split(":")[0] for s in shared_directory)) == len(shared_directory), "Shared directories volumes are not unique"

assert len(set(s.split(":")[0] for s in shared_directory)) == len(
shared_directory), "Shared directories volumes are not unique"

if shared_path:
for task in self.task_list():
task.add_env('shared_directory', shared_path)
else:
self.volumes = tuple()

self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if ':' in v]

self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if
':' in v]

def task_list(self):
raise NotImplementedError()
Expand Down Expand Up @@ -131,6 +134,15 @@ def spec(self):
}
}]
}

if is_accounts_present():
spec['volumes'].append({
'name': 'cloudharness-kc-accounts',
'secret': {
'secretName': 'accounts'
}
})

if self.on_exit_notify:
spec = self.add_on_exit_notify_handler(spec)

Expand All @@ -143,21 +155,18 @@ def spec(self):
':' in volume] # with PVC prefix (e.g. pvc-001:/location)
return spec



def affinity_spec(self):
contexts=self.pod_contexts
contexts = self.pod_contexts
PREFERRED = 'preferredDuringSchedulingIgnoredDuringExecution'
REQUIRED = 'requiredDuringSchedulingIgnoredDuringExecution'

pod_affinity = {
PREFERRED: [],
REQUIRED: []
}

PREFERRED: [],
REQUIRED: []
}

for context in contexts:
term= {
term = {
'labelSelector':
{
'matchExpressions': [
Expand All @@ -172,11 +181,11 @@ def affinity_spec(self):
}
if not context.required:
pod_affinity[PREFERRED].append(
{
'weight': 100,
'podAffinityTerm': term
{
'weight': 100,
'podAffinityTerm': term

})
})
else:
pod_affinity[REQUIRED].append(term)

Expand All @@ -203,7 +212,7 @@ def add_on_exit_notify_handler(self, spec):
def modify_template(self, template):
"""Hook to modify templates (e.g. add volumes)"""

template["metadata"] = {"labels": {c.key:c.value for c in self.pod_contexts}}
template["metadata"] = {"labels": {c.key: c.value for c in self.pod_contexts}}

if self.volumes:
if 'container' in template:
Expand Down Expand Up @@ -249,11 +258,11 @@ def volume_template(self, volume):
if len(splitted) > 1:
path = splitted[1]
return dict({
'name': self.name_from_path(path),
'name': self.name_from_path(path),
'mountPath': path,
'readonly': False if len(splitted) < 3 else splitted[2] == "ro"
})
})

def spec_volumeclaim(self, volume):
# when the volume is NOT prefixed by a PVC (e.g. /location) then create a temporary PVC for the workflow
if ':' not in volume:
Expand Down Expand Up @@ -282,10 +291,11 @@ def spec_volume(self, volume):
'persistentVolumeClaim': {
'claimName': pvc
},

}
return {}


class SyncOperation(ManagedOperation):
"""A Sync operation returns the result directly with the execute method"""

Expand Down Expand Up @@ -321,10 +331,9 @@ def __init__(self, name, task: Task, *args, **kwargs):
"""
self.task = task
super().__init__(name, *args, **kwargs)


def task_list(self):
return (self.task, )
return (self.task,)

@property
def entrypoint(self):
Expand Down Expand Up @@ -389,8 +398,6 @@ def __init__(self, basename, tasks, shared_directory="", shared_volume_size=10,
"""
self.tasks = tasks
AsyncOperation.__init__(self, basename, pod_context, shared_directory=shared_directory, *args, **kwargs)



self.shared_volume_size = shared_volume_size
if len(self.task_list()) != len(set(self.task_list())):
Expand All @@ -415,12 +422,8 @@ def spec(self):
def modify_template(self, template):
# TODO verify the following condition. Can we mount volumes also with source based templates
super().modify_template(template)

return template




return template


class PipelineOperation(CompositeOperation):
Expand Down Expand Up @@ -472,10 +475,12 @@ def entrypoint(self):
class SimpleDagOperation(CompositeOperation):
"""Simple DAG definition limited to a pipeline of parallel operations"""

def __init__(self, basename, *task_groups, shared_directory=None, pod_context: PodExecutionContext = None, **kwargs):
def __init__(self, basename, *task_groups, shared_directory=None, pod_context: PodExecutionContext = None,
**kwargs):
task_groups = tuple(
task_group if isinstance(task_group, Iterable) else (task_group,) for task_group in task_groups)
super().__init__(basename, pod_context=pod_context, tasks=task_groups, shared_directory=shared_directory, **kwargs)
super().__init__(basename, pod_context=pod_context, tasks=task_groups, shared_directory=shared_directory,
**kwargs)

def steps_spec(self):
return [[task.instance() for task in task_group] for task_group in self.tasks]
Expand Down
28 changes: 18 additions & 10 deletions libraries/cloudharness-common/cloudharness/workflows/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from . import argo

from cloudharness.utils.env import get_cloudharness_variables, get_image_full_tag
from .utils import WORKFLOW_NAME_VARIABLE_NAME
from .utils import WORKFLOW_NAME_VARIABLE_NAME, is_accounts_present

SERVICE_ACCOUNT = 'argo-workflows'

Expand Down Expand Up @@ -41,18 +41,26 @@ def envs(self):
for key, value in self.__envs.items()]
# Add the name of the workflow to task env
envs.append({'name': WORKFLOW_NAME_VARIABLE_NAME, 'valueFrom': {
'fieldRef': {'fieldPath': 'metadata.name'}}})
'fieldRef': {'fieldPath': 'metadata.name'}}})
return envs

def add_env(self, name, value):
self.__envs[name] = value

def cloudharness_configmap_spec(self):
return {
'name': 'cloudharness-allvalues',
'mountPath': '/opt/cloudharness/resources/allvalues.yaml',
'subPath': 'allvalues.yaml'
}
base_spec = [
{
'name': 'cloudharness-allvalues',
'mountPath': '/opt/cloudharness/resources/allvalues.yaml',
'subPath': 'allvalues.yaml'
}
]
if is_accounts_present():
base_spec.append({
'name': 'cloudharness-kc-accounts',
'mountPath': '/opt/cloudharness/resources/auth',
})
return base_spec


class ContainerizedTask(Task):
Expand All @@ -69,7 +77,7 @@ def spec(self):
'env': self.envs,
'resources': self.resources,
'imagePullPolicy': self.image_pull_policy,
'volumeMounts': [self.cloudharness_configmap_spec()],
'volumeMounts': self.cloudharness_configmap_spec(),
},
'inputs': {},
'metadata': {},
Expand Down Expand Up @@ -99,7 +107,7 @@ def spec(self):
'image': self.image_name,
'env': self.envs,
'source': self.source,
'volumeMounts': [self.cloudharness_configmap_spec()],
'volumeMounts': self.cloudharness_configmap_spec(),
'command': [self.command]
}
}
Expand Down Expand Up @@ -161,7 +169,7 @@ def image_name(self):


class SendResultTask(CustomTask):
"""Special task used to send the a workflow result to a queue.
"""Special task used to send a workflow result to a queue.
The workflow result consists of all the files inside the shared directory"""

def __init__(self):
Expand Down
Loading