Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

#118: fix run_namespaced_job to read the condition status rather than the number of succeeded/failed pods in a job #119

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions prefect_kubernetes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,29 @@ async def wait_for_completion(self):
await sleep(self._kubernetes_job.interval_seconds)
if self._kubernetes_job.timeout_seconds:
elapsed_time += self._kubernetes_job.interval_seconds
elif v1_job_status.status.failed:
raise RuntimeError(
f"Job {v1_job_status.metadata.name!r} failed, check the "
"Kubernetes pod logs for more information."
)
elif v1_job_status.status.succeeded:
self._completed = True
self.logger.info(f"Job {v1_job_status.metadata.name!r} has completed.")
elif v1_job_status.status.conditions:
final_completed_conditions = [
condition.type == "Complete"
for condition in v1_job_status.status.conditions
if condition.status == "True"
]
if final_completed_conditions and any(final_completed_conditions):
self._completed = True
self.logger.info(
f"Job {v1_job_status.metadata.name!r} has "
f"completed with {v1_job_status.status.succeeded} pods."
)
elif final_completed_conditions:
failed_conditions = [
condition.reason
for condition in v1_job_status.status.conditions
if condition.type == "Failed"
]
raise RuntimeError(
f"Job {v1_job_status.metadata.name!r} failed due to "
f"{failed_conditions}, check the Kubernetes pod logs "
f"for more information."
)

if self._kubernetes_job.delete_after_completion:
await self._cleanup()
Expand Down
26 changes: 25 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ def successful_job_status():
job_status.status.active = None
job_status.status.failed = None
job_status.status.succeeded = 1
job_status.status.conditions = [
models.V1JobCondition(type="Complete", status="True"),
]
return job_status


@pytest.fixture
def unsuccessful_job_status():
job_status = MagicMock()
job_status.status.active = 0
job_status.status.failed = 1
job_status.status.succeeded = 1
job_status.status.conditions = [
models.V1JobCondition(
type="Failed", status="True", reason="BackoffLimitExceeded"
),
]
return job_status


Expand Down Expand Up @@ -156,7 +173,14 @@ def mock_read_namespaced_job_status(monkeypatch):
spec=models.V1PodSpec(containers=[models.V1Container(name="test")])
)
),
status=models.V1JobStatus(active=0, failed=0, succeeded=1),
status=models.V1JobStatus(
active=0,
failed=0,
succeeded=1,
conditions=[
models.V1JobCondition(type="Complete", status="True"),
],
),
)
)
monkeypatch.setattr(
Expand Down
35 changes: 30 additions & 5 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async def test_run_namespaced_job_timeout_respected(
):
successful_job_status.status.active = 1
successful_job_status.status.succeeded = None
successful_job_status.status.conditions = []
mock_read_namespaced_job_status.return_value = successful_job_status

valid_kubernetes_job_block.timeout_seconds = 1
Expand Down Expand Up @@ -80,16 +81,14 @@ async def test_run_namespaced_job_unsuccessful(
mock_create_namespaced_job,
mock_read_namespaced_job_status,
mock_delete_namespaced_job,
successful_job_status,
unsuccessful_job_status,
mock_list_namespaced_pod,
read_pod_logs,
):

successful_job_status.status.failed = 1
successful_job_status.status.succeeded = None
mock_read_namespaced_job_status.return_value = successful_job_status
mock_read_namespaced_job_status.return_value = unsuccessful_job_status

with pytest.raises(RuntimeError, match="failed, check the Kubernetes pod logs"):
with pytest.raises(RuntimeError, match=", check the Kubernetes pod logs"):
await run_namespaced_job(kubernetes_job=valid_kubernetes_job_block)

assert mock_create_namespaced_job.call_count == 1
Expand Down Expand Up @@ -125,3 +124,29 @@ def test_sync_flow():
assert mock_read_namespaced_job_status.call_count == 1

assert mock_delete_namespaced_job.call_count == 1


async def test_run_namespaced_job_successful_with_evictions(
valid_kubernetes_job_block,
mock_create_namespaced_job,
mock_read_namespaced_job_status,
mock_delete_namespaced_job,
successful_job_status,
mock_list_namespaced_pod,
read_pod_logs,
):
successful_job_status.status.active = 0
successful_job_status.status.failed = 1
mock_read_namespaced_job_status.return_value = successful_job_status

await run_namespaced_job(kubernetes_job=valid_kubernetes_job_block)

assert mock_create_namespaced_job.call_count == 1
assert mock_create_namespaced_job.call_args[1]["namespace"] == "default"
assert mock_create_namespaced_job.call_args[1]["body"].metadata.name == "pi"

assert read_pod_logs.call_count == 1

assert mock_read_namespaced_job_status.call_count == 1

assert mock_delete_namespaced_job.call_count == 1
Loading