Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

make block methods sync compatible and update flow #34

Merged
merged 2 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

### Added
- Sync compatibility for block method calls used by `run_namespaced_job` - [#34](https://github.com/PrefectHQ/prefect-kubernetes/pull/34)

### Changed

Expand Down
6 changes: 3 additions & 3 deletions prefect_kubernetes/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ async def run_namespaced_job(
)
```
"""
kubernetes_job_run = await task(kubernetes_job.trigger)()
kubernetes_job_run = await task(kubernetes_job.trigger.aio)(kubernetes_job)

await task(kubernetes_job_run.wait_for_completion)()
await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run)

return await task(kubernetes_job_run.fetch_result)()
return await task(kubernetes_job_run.fetch_result.aio)(kubernetes_job_run)
5 changes: 4 additions & 1 deletion prefect_kubernetes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kubernetes.client.models import V1DeleteOptions, V1Job, V1JobList, V1Status
from prefect import task
from prefect.blocks.abstract import JobBlock, JobRun
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import Field
from typing_extensions import Self

Expand Down Expand Up @@ -339,6 +339,7 @@ async def _cleanup(self):
f"with {deleted_v1_job.status!r}."
)

@sync_compatible
async def wait_for_completion(self):
"""Waits for the job to complete.

Expand Down Expand Up @@ -425,6 +426,7 @@ async def wait_for_completion(self):
if self._kubernetes_job.delete_after_completion:
await self._cleanup()

@sync_compatible
async def fetch_result(self) -> Dict[str, Any]:
"""Fetch the results of the job.

Expand Down Expand Up @@ -486,6 +488,7 @@ class KubernetesJob(JobBlock):
_block_type_slug = "k8s-job"
_logo_url = "https://images.ctfassets.net/zscdif0zqppk/35vNcprr3MmIlkrKxxCiah/1d720b4b50dfa8876198cf21730cf123/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa: E501

@sync_compatible
async def trigger(self):
"""Create a Kubernetes job and return a `KubernetesJobRun` object."""

Expand Down
27 changes: 27 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from prefect import flow

from prefect_kubernetes.exceptions import KubernetesJobTimeoutError
from prefect_kubernetes.flows import run_namespaced_job
Expand Down Expand Up @@ -98,3 +99,29 @@ async def test_run_namespaced_job_unsuccessful(
assert mock_read_namespaced_job_status.call_count == 1

assert mock_delete_namespaced_job.call_count == 0


def test_run_namespaced_job_sync_subflow(
valid_kubernetes_job_block,
mock_create_namespaced_job,
mock_read_namespaced_job_status,
mock_delete_namespaced_job,
successful_job_status,
mock_list_namespaced_pod,
read_pod_logs,
):
@flow
def test_sync_flow():
return run_namespaced_job(kubernetes_job=valid_kubernetes_job_block)

test_sync_flow()

assert mock_create_namespaced_job.call_count == 1
assert mock_create_namespaced_job.call_args[1]["namespace"] == "default"
assert mock_create_namespaced_job.call_args[1]["body"].metadata.name == "pi"

assert read_pod_logs.call_count == 1

assert mock_read_namespaced_job_status.call_count == 1

assert mock_delete_namespaced_job.call_count == 1