From c7e756e43f8f55469b1bdd030d8bf98a353da227 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Tue, 17 Jan 2023 10:38:01 -0600 Subject: [PATCH 1/2] make block methods sync compatible and update flow --- prefect_kubernetes/flows.py | 6 +++--- prefect_kubernetes/jobs.py | 5 ++++- tests/test_flows.py | 27 +++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/prefect_kubernetes/flows.py b/prefect_kubernetes/flows.py index d6bc951..5008614 100644 --- a/prefect_kubernetes/flows.py +++ b/prefect_kubernetes/flows.py @@ -36,8 +36,8 @@ async def run_namespaced_job( ) ``` """ - kubernetes_job_run = await task(kubernetes_job.trigger)() + kubernetes_job_run = await task(kubernetes_job.trigger.aio)(kubernetes_job) - await task(kubernetes_job_run.wait_for_completion)() + await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run) - return await task(kubernetes_job_run.fetch_result)() + return await task(kubernetes_job_run.fetch_result.aio)(kubernetes_job_run) diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index 2882e3d..fe5952d 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -8,7 +8,7 @@ from kubernetes.client.models import V1DeleteOptions, V1Job, V1JobList, V1Status from prefect import task from prefect.blocks.abstract import JobBlock, JobRun -from prefect.utilities.asyncutils import run_sync_in_worker_thread +from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible from pydantic import Field from typing_extensions import Self @@ -339,6 +339,7 @@ async def _cleanup(self): f"with {deleted_v1_job.status!r}." ) + @sync_compatible async def wait_for_completion(self): """Waits for the job to complete. @@ -425,6 +426,7 @@ async def wait_for_completion(self): if self._kubernetes_job.delete_after_completion: await self._cleanup() + @sync_compatible async def fetch_result(self) -> Dict[str, Any]: """Fetch the results of the job. @@ -486,6 +488,7 @@ class KubernetesJob(JobBlock): _block_type_slug = "k8s-job" _logo_url = "https://images.ctfassets.net/zscdif0zqppk/35vNcprr3MmIlkrKxxCiah/1d720b4b50dfa8876198cf21730cf123/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa: E501 + @sync_compatible async def trigger(self): """Create a Kubernetes job and return a `KubernetesJobRun` object.""" diff --git a/tests/test_flows.py b/tests/test_flows.py index 9eddfc9..0b5b84d 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -1,4 +1,5 @@ import pytest +from prefect import flow from prefect_kubernetes.exceptions import KubernetesJobTimeoutError from prefect_kubernetes.flows import run_namespaced_job @@ -98,3 +99,29 @@ async def test_run_namespaced_job_unsuccessful( assert mock_read_namespaced_job_status.call_count == 1 assert mock_delete_namespaced_job.call_count == 0 + + +def test_run_namespaced_job_sync_subflow( + valid_kubernetes_job_block, + mock_create_namespaced_job, + mock_read_namespaced_job_status, + mock_delete_namespaced_job, + successful_job_status, + mock_list_namespaced_pod, + read_pod_logs, +): + @flow + def test_sync_flow(): + return run_namespaced_job(kubernetes_job=valid_kubernetes_job_block) + + test_sync_flow() + + assert mock_create_namespaced_job.call_count == 1 + assert mock_create_namespaced_job.call_args[1]["namespace"] == "default" + assert mock_create_namespaced_job.call_args[1]["body"].metadata.name == "pi" + + assert read_pod_logs.call_count == 1 + + assert mock_read_namespaced_job_status.call_count == 1 + + assert mock_delete_namespaced_job.call_count == 1 From d57ec7a50c31ced4f488b61bdc03670f654b0545 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Tue, 17 Jan 2023 10:43:50 -0600 Subject: [PATCH 2/2] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01b5cc9..2acd0c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added +- Sync compatibility for block method calls used by `run_namespaced_job` - [#34](https://github.com/PrefectHQ/prefect-kubernetes/pull/34) ### Changed