diff --git a/applications/samples/backend/samples/controllers/workflows_controller.py b/applications/samples/backend/samples/controllers/workflows_controller.py index f3345f96..b2b27c99 100644 --- a/applications/samples/backend/samples/controllers/workflows_controller.py +++ b/applications/samples/backend/samples/controllers/workflows_controller.py @@ -1,6 +1,7 @@ import connexion import six +from cloudharness.auth import get_api_password from samples.models.inline_response202 import InlineResponse202 # noqa: E501 from samples import util from samples.models import InlineResponse202 @@ -12,7 +13,8 @@ try: from cloudharness.workflows import operations, tasks except Exception as e: - log.error("Cannot start workflows module. Probably this is related some problem with the kubectl configuration", exc_info=True) + log.error("Cannot start workflows module. Probably this is related some problem with the kubectl configuration", + exc_info=True) def submit_async(): # noqa: E501 @@ -33,7 +35,8 @@ def submit_async(): # noqa: E501 submitted = op.execute() if not op.is_error(): - return InlineResponse202(task=InlineResponse202Task(href=op.get_operation_update_url(), name=submitted.name)), 202 + return InlineResponse202( + task=InlineResponse202Task(href=op.get_operation_update_url(), name=submitted.name)), 202 else: return 'Error submitting operation', 500 @@ -47,7 +50,8 @@ def submit_sync(): # noqa: E501 :rtype: str """ task = tasks.CustomTask( - 'download-file', 'workflows-extract-download', url='https://github.com/MetaCell/cloud-harness/blob/master/README.md') + 'download-file', 'workflows-extract-download', + url='https://github.com/MetaCell/cloud-harness/blob/master/README.md') op = operations.DistributedSyncOperation('test-sync-op-', task) try: @@ -60,6 +64,22 @@ def submit_sync(): # noqa: E501 return 'Error submitting operation: %s' % e, 500 +def submit_secret_sync_with_results(): # noqa: E501 + """Send a synchronous operation and get results using the event queue. Checks access to get_api_password + + :rtype: str + """ + task = tasks.CustomTask('test-secret', 'samples-secret') + try: + op = operations.DistributedSyncOperationWithResults( + 'test-sync-secret-op-results-', task) + result = op.execute() + assert result[0]["result"] != '' + return result[0]["result"] + except Exception as e: + return jsonify(str(e)), 500 + + def submit_sync_with_results(a=1, b=2): # noqa: E501 """Send a synchronous operation and get results using the event queue. Just a sum, but in the cloud diff --git a/applications/samples/tasks/secret/Dockerfile b/applications/samples/tasks/secret/Dockerfile new file mode 100644 index 00000000..6eebbd94 --- /dev/null +++ b/applications/samples/tasks/secret/Dockerfile @@ -0,0 +1,6 @@ +ARG CLOUDHARNESS_BASE +FROM $CLOUDHARNESS_BASE + +ADD . / + +CMD python main.py \ No newline at end of file diff --git a/applications/samples/tasks/secret/main.py b/applications/samples/tasks/secret/main.py new file mode 100644 index 00000000..3899d0fc --- /dev/null +++ b/applications/samples/tasks/secret/main.py @@ -0,0 +1,13 @@ +import os + +from cloudharness.auth import get_api_password +from cloudharness.workflows.utils import get_shared_directory + +for env in os.environ: + print(f"{env}:{os.environ[env]}") + +file_name = os.path.join(get_shared_directory(), "result") +print("File name is", file_name) + +with open(file_name, "w") as f: + f.write(get_api_password()) diff --git a/libraries/cloudharness-common/cloudharness/workflows/operations.py b/libraries/cloudharness-common/cloudharness/workflows/operations.py index 0dcb2632..574c0e64 100644 --- a/libraries/cloudharness-common/cloudharness/workflows/operations.py +++ b/libraries/cloudharness-common/cloudharness/workflows/operations.py @@ -1,17 +1,16 @@ import time -import pyaml +from collections.abc import Iterable from typing import Union -from collections.abc import Iterable -from kubernetes.client.models.v1_affinity import V1Affinity -from cloudharness_cli.workflows.models.operation_status import OperationStatus +import pyaml +from cloudharness import log from cloudharness.events.client import EventClient from cloudharness.utils import env, config -from cloudharness import log - +from cloudharness_cli.workflows.models.operation_status import OperationStatus from . import argo from .tasks import Task, SendResultTask, CustomTask +from .utils import is_accounts_present POLLING_WAIT_SECONDS = 1 SERVICE_ACCOUNT = 'argo-workflows' @@ -39,15 +38,15 @@ class ManagedOperation: """ def __init__(self, - name, - ttl_strategy: dict = { - 'secondsAfterCompletion': 60 * 60, - 'secondsAfterSuccess': 60 * 20, - 'secondsAfterFailure': 60 * 120 - }, - *args, - on_exit_notify=None, - **kwargs): + name, + ttl_strategy: dict = { + 'secondsAfterCompletion': 60 * 60, + 'secondsAfterSuccess': 60 * 20, + 'secondsAfterFailure': 60 * 120 + }, + *args, + on_exit_notify=None, + **kwargs): self.name = name self.ttl_strategy = ttl_strategy self.on_exit_notify = on_exit_notify @@ -56,12 +55,14 @@ def execute(self, **parameters): raise NotImplementedError(f"{self.__class__.__name__} is abstract") + class ContainerizedOperation(ManagedOperation): """ Abstract Containarized operation based on an argo workflow """ - def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None, shared_directory=None, *args, **kwargs): + def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, tuple] = None, + shared_directory=None, *args, **kwargs): """ :param basename: :param pod_context: PodExecutionContext - represents affinity with other pods in the system @@ -74,7 +75,7 @@ def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, self.pod_contexts = [] else: self.pod_contexts = list(pod_context) - + self.persisted = None shared_path = None if shared_directory: @@ -87,15 +88,17 @@ def __init__(self, basename: str, pod_context: Union[PodExecutionContext, list, else: self.volumes = shared_directory assert len(set(shared_directory)) == len(shared_directory), "Shared directories are not unique" - assert len(set(s.split(":")[0] for s in shared_directory)) == len(shared_directory), "Shared directories volumes are not unique" - + assert len(set(s.split(":")[0] for s in shared_directory)) == len( + shared_directory), "Shared directories volumes are not unique" + if shared_path: for task in self.task_list(): task.add_env('shared_directory', shared_path) else: self.volumes = tuple() - - self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if ':' in v] + + self.pod_contexts += [PodExecutionContext('usesvolume', v.split(':')[0], True) for v in self.volumes if + ':' in v] def task_list(self): raise NotImplementedError() @@ -131,6 +134,15 @@ def spec(self): } }] } + + if is_accounts_present(): + spec['volumes'].append({ + 'name': 'cloudharness-kc-accounts', + 'secret': { + 'secretName': 'accounts' + } + }) + if self.on_exit_notify: spec = self.add_on_exit_notify_handler(spec) @@ -143,21 +155,18 @@ def spec(self): ':' in volume] # with PVC prefix (e.g. pvc-001:/location) return spec - - def affinity_spec(self): - contexts=self.pod_contexts + contexts = self.pod_contexts PREFERRED = 'preferredDuringSchedulingIgnoredDuringExecution' REQUIRED = 'requiredDuringSchedulingIgnoredDuringExecution' pod_affinity = { - PREFERRED: [], - REQUIRED: [] - } - + PREFERRED: [], + REQUIRED: [] + } for context in contexts: - term= { + term = { 'labelSelector': { 'matchExpressions': [ @@ -172,11 +181,11 @@ def affinity_spec(self): } if not context.required: pod_affinity[PREFERRED].append( - { - 'weight': 100, - 'podAffinityTerm': term + { + 'weight': 100, + 'podAffinityTerm': term - }) + }) else: pod_affinity[REQUIRED].append(term) @@ -203,7 +212,7 @@ def add_on_exit_notify_handler(self, spec): def modify_template(self, template): """Hook to modify templates (e.g. add volumes)""" - template["metadata"] = {"labels": {c.key:c.value for c in self.pod_contexts}} + template["metadata"] = {"labels": {c.key: c.value for c in self.pod_contexts}} if self.volumes: if 'container' in template: @@ -249,11 +258,11 @@ def volume_template(self, volume): if len(splitted) > 1: path = splitted[1] return dict({ - 'name': self.name_from_path(path), + 'name': self.name_from_path(path), 'mountPath': path, 'readonly': False if len(splitted) < 3 else splitted[2] == "ro" - }) - + }) + def spec_volumeclaim(self, volume): # when the volume is NOT prefixed by a PVC (e.g. /location) then create a temporary PVC for the workflow if ':' not in volume: @@ -282,10 +291,11 @@ def spec_volume(self, volume): 'persistentVolumeClaim': { 'claimName': pvc }, - + } return {} + class SyncOperation(ManagedOperation): """A Sync operation returns the result directly with the execute method""" @@ -321,10 +331,9 @@ def __init__(self, name, task: Task, *args, **kwargs): """ self.task = task super().__init__(name, *args, **kwargs) - def task_list(self): - return (self.task, ) + return (self.task,) @property def entrypoint(self): @@ -389,8 +398,6 @@ def __init__(self, basename, tasks, shared_directory="", shared_volume_size=10, """ self.tasks = tasks AsyncOperation.__init__(self, basename, pod_context, shared_directory=shared_directory, *args, **kwargs) - - self.shared_volume_size = shared_volume_size if len(self.task_list()) != len(set(self.task_list())): @@ -415,12 +422,8 @@ def spec(self): def modify_template(self, template): # TODO verify the following condition. Can we mount volumes also with source based templates super().modify_template(template) - - return template - - - + return template class PipelineOperation(CompositeOperation): @@ -472,10 +475,12 @@ def entrypoint(self): class SimpleDagOperation(CompositeOperation): """Simple DAG definition limited to a pipeline of parallel operations""" - def __init__(self, basename, *task_groups, shared_directory=None, pod_context: PodExecutionContext = None, **kwargs): + def __init__(self, basename, *task_groups, shared_directory=None, pod_context: PodExecutionContext = None, + **kwargs): task_groups = tuple( task_group if isinstance(task_group, Iterable) else (task_group,) for task_group in task_groups) - super().__init__(basename, pod_context=pod_context, tasks=task_groups, shared_directory=shared_directory, **kwargs) + super().__init__(basename, pod_context=pod_context, tasks=task_groups, shared_directory=shared_directory, + **kwargs) def steps_spec(self): return [[task.instance() for task in task_group] for task_group in self.tasks] diff --git a/libraries/cloudharness-common/cloudharness/workflows/tasks.py b/libraries/cloudharness-common/cloudharness/workflows/tasks.py index ed6c9f79..e881b482 100644 --- a/libraries/cloudharness-common/cloudharness/workflows/tasks.py +++ b/libraries/cloudharness-common/cloudharness/workflows/tasks.py @@ -1,7 +1,7 @@ from . import argo from cloudharness.utils.env import get_cloudharness_variables, get_image_full_tag -from .utils import WORKFLOW_NAME_VARIABLE_NAME +from .utils import WORKFLOW_NAME_VARIABLE_NAME, is_accounts_present SERVICE_ACCOUNT = 'argo-workflows' @@ -41,18 +41,26 @@ def envs(self): for key, value in self.__envs.items()] # Add the name of the workflow to task env envs.append({'name': WORKFLOW_NAME_VARIABLE_NAME, 'valueFrom': { - 'fieldRef': {'fieldPath': 'metadata.name'}}}) + 'fieldRef': {'fieldPath': 'metadata.name'}}}) return envs def add_env(self, name, value): self.__envs[name] = value def cloudharness_configmap_spec(self): - return { - 'name': 'cloudharness-allvalues', - 'mountPath': '/opt/cloudharness/resources/allvalues.yaml', - 'subPath': 'allvalues.yaml' - } + base_spec = [ + { + 'name': 'cloudharness-allvalues', + 'mountPath': '/opt/cloudharness/resources/allvalues.yaml', + 'subPath': 'allvalues.yaml' + } + ] + if is_accounts_present(): + base_spec.append({ + 'name': 'cloudharness-kc-accounts', + 'mountPath': '/opt/cloudharness/resources/auth', + }) + return base_spec class ContainerizedTask(Task): @@ -69,7 +77,7 @@ def spec(self): 'env': self.envs, 'resources': self.resources, 'imagePullPolicy': self.image_pull_policy, - 'volumeMounts': [self.cloudharness_configmap_spec()], + 'volumeMounts': self.cloudharness_configmap_spec(), }, 'inputs': {}, 'metadata': {}, @@ -99,7 +107,7 @@ def spec(self): 'image': self.image_name, 'env': self.envs, 'source': self.source, - 'volumeMounts': [self.cloudharness_configmap_spec()], + 'volumeMounts': self.cloudharness_configmap_spec(), 'command': [self.command] } } @@ -161,7 +169,7 @@ def image_name(self): class SendResultTask(CustomTask): - """Special task used to send the a workflow result to a queue. + """Special task used to send a workflow result to a queue. The workflow result consists of all the files inside the shared directory""" def __init__(self): diff --git a/libraries/cloudharness-common/cloudharness/workflows/utils.py b/libraries/cloudharness-common/cloudharness/workflows/utils.py index ac2ea3ec..285a7d10 100644 --- a/libraries/cloudharness-common/cloudharness/workflows/utils.py +++ b/libraries/cloudharness-common/cloudharness/workflows/utils.py @@ -1,5 +1,6 @@ import os +from cloudharness import applications from cloudharness.events.client import EventClient from cloudharness.utils.env import get_variable @@ -22,3 +23,11 @@ def get_shared_directory(): def notify_queue(queue, message): client = EventClient(queue) client.produce(message) + + +def is_accounts_present(): + try: + applications.ApplicationConfiguration = applications.get_configuration('accounts') + return True + except Exception: + return False diff --git a/libraries/cloudharness-common/tests/test_workflow.py b/libraries/cloudharness-common/tests/test_workflow.py index c9183a16..8db103d1 100644 --- a/libraries/cloudharness-common/tests/test_workflow.py +++ b/libraries/cloudharness-common/tests/test_workflow.py @@ -2,6 +2,7 @@ import requests import yaml +from cloudharness.workflows.utils import is_accounts_present from .test_env import set_test_environment set_test_environment() @@ -99,14 +100,17 @@ def test_single_task_shared(): task_write = operations.CustomTask('download-file', 'workflows-extract-download', url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md') op = operations.SingleTaskOperation('test-custom-connected-op-', task_write, - shared_directory=shared_directory, shared_volume_size=100) + shared_directory=shared_directory, shared_volume_size=100) wf = op.to_workflow() print('\n', yaml.dump(wf)) - assert len(op.volumes) == 1 - assert len(wf['spec']['volumes']) == 2 - assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim' - assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 2 + accounts_offset = 1 if is_accounts_present() else 0 + assert len(op.volumes) == 1 + assert len(wf['spec']['volumes']) == 2 + accounts_offset + assert wf['spec']['volumes'][1+accounts_offset]['persistentVolumeClaim']['claimName'] == 'myclaim' + if accounts_offset == 1: + assert wf['spec']['volumes'][1]['secret']['secretName'] == 'accounts' + assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 2 + accounts_offset if execute: print(op.execute()) @@ -116,43 +120,50 @@ def test_single_task_shared_multiple(): task_write = operations.CustomTask('download-file', 'workflows-extract-download', url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md') op = operations.SingleTaskOperation('test-custom-connected-op-', task_write, - shared_directory=shared_directory) + shared_directory=shared_directory) wf = op.to_workflow() print('\n', yaml.dump(wf)) + accounts_offset = 1 if is_accounts_present() else 0 - assert len(op.volumes) == 2 - assert len(wf['spec']['volumes']) == 3 - assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim' - assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 3 + assert len(op.volumes) == 2 + assert len(wf['spec']['volumes']) == 3 + accounts_offset + assert wf['spec']['volumes'][1+accounts_offset]['persistentVolumeClaim']['claimName'] == 'myclaim' + assert len(wf['spec']['templates'][0]['container']['volumeMounts']) == 3 + accounts_offset - assert wf['spec']['templates'][0]['container']['volumeMounts'][2]['readonly'] == True + assert wf['spec']['templates'][0]['container']['volumeMounts'][2+accounts_offset]['readonly'] assert wf['spec']['templates'][0]['metadata']['labels']['usesvolume'] assert 'affinity' in wf['spec'] - assert len(wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution']) == 2, "A pod affinity for each volume is expected" - affinity_expr = wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector']['matchExpressions'][0] + assert len(wf['spec']['affinity']['podAffinity'][ + 'requiredDuringSchedulingIgnoredDuringExecution']) == 2, "A pod affinity for each volume is expected" + affinity_expr = \ + wf['spec']['affinity']['podAffinity']['requiredDuringSchedulingIgnoredDuringExecution'][0]['labelSelector'][ + 'matchExpressions'][0] assert affinity_expr['key'] == 'usesvolume' assert affinity_expr['values'][0] == 'myclaim' if execute: print(op.execute()) + def test_single_task_shared_script(): shared_directory = 'myclaim:/mnt/shared' task_write = tasks.BashTask('download-file', source="ls -la") op = operations.SingleTaskOperation('test-custom-connected-op-', task_write, - shared_directory=shared_directory, shared_volume_size=100) + shared_directory=shared_directory, shared_volume_size=100) wf = op.to_workflow() print('\n', yaml.dump(wf)) + accounts_offset = 1 if is_accounts_present() else 0 + + assert len(op.volumes) == 1 + assert len(wf['spec']['volumes']) == 2+accounts_offset + assert wf['spec']['volumes'][1+accounts_offset]['persistentVolumeClaim']['claimName'] == 'myclaim' + assert len(wf['spec']['templates'][0]['script']['volumeMounts']) == 2+accounts_offset - assert len(op.volumes) == 1 - assert len(wf['spec']['volumes']) == 2 - assert wf['spec']['volumes'][1]['persistentVolumeClaim']['claimName'] == 'myclaim' - assert len(wf['spec']['templates'][0]['script']['volumeMounts']) == 2 - if execute: print(op.execute()) + def test_result_task_workflow(): task_write = operations.CustomTask('download-file', 'workflows-extract-download', url='https://raw.githubusercontent.com/openworm/org.geppetto/master/README.md') @@ -242,13 +253,12 @@ def f(): for task in workflow['spec']['templates']: assert task['metadata']['labels']['a'] == 'b' - op = operations.ParallelOperation('test-parallel-op-', (tasks.PythonTask('p1', f), tasks.PythonTask('p2', f)), pod_context=( - operations.PodExecutionContext('a', 'b'), - operations.PodExecutionContext('c', 'd', required=True), - operations.PodExecutionContext('e', 'f') - )) + operations.PodExecutionContext('a', 'b'), + operations.PodExecutionContext('c', 'd', required=True), + operations.PodExecutionContext('e', 'f') + )) workflow = op.to_workflow() assert 'affinity' in workflow['spec'] preferred = workflow['spec']['affinity']['podAffinity']['preferredDuringSchedulingIgnoredDuringExecution'] @@ -275,4 +285,4 @@ def f(): 'labelSelector'][ 'matchExpressions'][0] assert affinity_expr['key'] == 'c' - assert affinity_expr['values'][0] == 'd' \ No newline at end of file + assert affinity_expr['values'][0] == 'd'