Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #34 from PrefectHQ/sync-compatible
Browse files Browse the repository at this point in the history
make block methods sync compatible and update flow
  • Loading branch information
zzstoatzz authored Jan 17, 2023
2 parents 3d6c0cd + d57ec7a commit a69120f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

### Added
- Sync compatibility for block method calls used by `run_namespaced_job` - [#34](https://github.com/PrefectHQ/prefect-kubernetes/pull/34)

### Changed

Expand Down
6 changes: 3 additions & 3 deletions prefect_kubernetes/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ async def run_namespaced_job(
)
```
"""
kubernetes_job_run = await task(kubernetes_job.trigger)()
kubernetes_job_run = await task(kubernetes_job.trigger.aio)(kubernetes_job)

await task(kubernetes_job_run.wait_for_completion)()
await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run)

return await task(kubernetes_job_run.fetch_result)()
return await task(kubernetes_job_run.fetch_result.aio)(kubernetes_job_run)
5 changes: 4 additions & 1 deletion prefect_kubernetes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kubernetes.client.models import V1DeleteOptions, V1Job, V1JobList, V1Status
from prefect import task
from prefect.blocks.abstract import JobBlock, JobRun
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import Field
from typing_extensions import Self

Expand Down Expand Up @@ -339,6 +339,7 @@ async def _cleanup(self):
f"with {deleted_v1_job.status!r}."
)

@sync_compatible
async def wait_for_completion(self):
"""Waits for the job to complete.
Expand Down Expand Up @@ -425,6 +426,7 @@ async def wait_for_completion(self):
if self._kubernetes_job.delete_after_completion:
await self._cleanup()

@sync_compatible
async def fetch_result(self) -> Dict[str, Any]:
"""Fetch the results of the job.
Expand Down Expand Up @@ -486,6 +488,7 @@ class KubernetesJob(JobBlock):
_block_type_slug = "k8s-job"
_logo_url = "https://images.ctfassets.net/zscdif0zqppk/35vNcprr3MmIlkrKxxCiah/1d720b4b50dfa8876198cf21730cf123/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa: E501

@sync_compatible
async def trigger(self):
"""Create a Kubernetes job and return a `KubernetesJobRun` object."""

Expand Down
27 changes: 27 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from prefect import flow

from prefect_kubernetes.exceptions import KubernetesJobTimeoutError
from prefect_kubernetes.flows import run_namespaced_job
Expand Down Expand Up @@ -98,3 +99,29 @@ async def test_run_namespaced_job_unsuccessful(
assert mock_read_namespaced_job_status.call_count == 1

assert mock_delete_namespaced_job.call_count == 0


def test_run_namespaced_job_sync_subflow(
valid_kubernetes_job_block,
mock_create_namespaced_job,
mock_read_namespaced_job_status,
mock_delete_namespaced_job,
successful_job_status,
mock_list_namespaced_pod,
read_pod_logs,
):
@flow
def test_sync_flow():
return run_namespaced_job(kubernetes_job=valid_kubernetes_job_block)

test_sync_flow()

assert mock_create_namespaced_job.call_count == 1
assert mock_create_namespaced_job.call_args[1]["namespace"] == "default"
assert mock_create_namespaced_job.call_args[1]["body"].metadata.name == "pi"

assert read_pod_logs.call_count == 1

assert mock_read_namespaced_job_status.call_count == 1

assert mock_delete_namespaced_job.call_count == 1

0 comments on commit a69120f

Please sign in to comment.