From bbc2b31ee898095393356d67f9ef05e07b278569 Mon Sep 17 00:00:00 2001 From: Nicholas Fiorentini <128426621+NicholasFiorentini@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:25:22 +0100 Subject: [PATCH 1/3] wait_for_completion use v1_job_status.status.conditions to check for job completion --- prefect_kubernetes/jobs.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index d754758..2a80b60 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -451,14 +451,27 @@ async def wait_for_completion(self): await sleep(self._kubernetes_job.interval_seconds) if self._kubernetes_job.timeout_seconds: elapsed_time += self._kubernetes_job.interval_seconds - elif v1_job_status.status.failed: - raise RuntimeError( - f"Job {v1_job_status.metadata.name!r} failed, check the " - "Kubernetes pod logs for more information." - ) - elif v1_job_status.status.succeeded: - self._completed = True - self.logger.info(f"Job {v1_job_status.metadata.name!r} has completed.") + elif v1_job_status.status.conditions: + final_completed_conditions = [ + condition.type == "Complete" + for condition in v1_job_status.status.conditions + if condition.status == "True" + ] + if final_completed_conditions and any(final_completed_conditions): + self._completed = True + self.logger.info( + f"Job {v1_job_status.metadata.name!r} has " + f"completed with {v1_job_status.status.succeeded} pods.") + elif final_completed_conditions: + failed_conditions = [ + condition.reason + for condition in v1_job_status.status.conditions + if condition.type == "Failed" + ] + raise RuntimeError( + f"Job {v1_job_status.metadata.name!r} failed due to {failed_conditions}, check the Kubernetes " + f"pod logs for more information." + ) if self._kubernetes_job.delete_after_completion: await self._cleanup() From 6028aa825d667cccd3f0fb780a57cd4ec808d01d Mon Sep 17 00:00:00 2001 From: Nicholas Fiorentini <128426621+NicholasFiorentini@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:28:53 +0100 Subject: [PATCH 2/3] pre-commit checks --- prefect_kubernetes/jobs.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index 2a80b60..a4b4886 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -461,7 +461,8 @@ async def wait_for_completion(self): self._completed = True self.logger.info( f"Job {v1_job_status.metadata.name!r} has " - f"completed with {v1_job_status.status.succeeded} pods.") + f"completed with {v1_job_status.status.succeeded} pods." + ) elif final_completed_conditions: failed_conditions = [ condition.reason @@ -469,8 +470,9 @@ async def wait_for_completion(self): if condition.type == "Failed" ] raise RuntimeError( - f"Job {v1_job_status.metadata.name!r} failed due to {failed_conditions}, check the Kubernetes " - f"pod logs for more information." + f"Job {v1_job_status.metadata.name!r} failed due to " + f"{failed_conditions}, check the Kubernetes pod logs " + f"for more information." ) if self._kubernetes_job.delete_after_completion: From 768fcddc53c8b04b56ffc1f1d72bd80dd9a4425c Mon Sep 17 00:00:00 2001 From: Nicholas Fiorentini <128426621+NicholasFiorentini@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:52:21 +0100 Subject: [PATCH 3/3] unit tests --- tests/conftest.py | 26 +++++++++++++++++++++++++- tests/test_flows.py | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f86a269..75268a8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,6 +57,23 @@ def successful_job_status(): job_status.status.active = None job_status.status.failed = None job_status.status.succeeded = 1 + job_status.status.conditions = [ + models.V1JobCondition(type="Complete", status="True"), + ] + return job_status + + +@pytest.fixture +def unsuccessful_job_status(): + job_status = MagicMock() + job_status.status.active = 0 + job_status.status.failed = 1 + job_status.status.succeeded = 1 + job_status.status.conditions = [ + models.V1JobCondition( + type="Failed", status="True", reason="BackoffLimitExceeded" + ), + ] return job_status @@ -156,7 +173,14 @@ def mock_read_namespaced_job_status(monkeypatch): spec=models.V1PodSpec(containers=[models.V1Container(name="test")]) ) ), - status=models.V1JobStatus(active=0, failed=0, succeeded=1), + status=models.V1JobStatus( + active=0, + failed=0, + succeeded=1, + conditions=[ + models.V1JobCondition(type="Complete", status="True"), + ], + ), ) ) monkeypatch.setattr( diff --git a/tests/test_flows.py b/tests/test_flows.py index 0b5b84d..9dc1bd4 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -15,6 +15,7 @@ async def test_run_namespaced_job_timeout_respected( ): successful_job_status.status.active = 1 successful_job_status.status.succeeded = None + successful_job_status.status.conditions = [] mock_read_namespaced_job_status.return_value = successful_job_status valid_kubernetes_job_block.timeout_seconds = 1 @@ -80,16 +81,14 @@ async def test_run_namespaced_job_unsuccessful( mock_create_namespaced_job, mock_read_namespaced_job_status, mock_delete_namespaced_job, - successful_job_status, + unsuccessful_job_status, mock_list_namespaced_pod, read_pod_logs, ): - successful_job_status.status.failed = 1 - successful_job_status.status.succeeded = None - mock_read_namespaced_job_status.return_value = successful_job_status + mock_read_namespaced_job_status.return_value = unsuccessful_job_status - with pytest.raises(RuntimeError, match="failed, check the Kubernetes pod logs"): + with pytest.raises(RuntimeError, match=", check the Kubernetes pod logs"): await run_namespaced_job(kubernetes_job=valid_kubernetes_job_block) assert mock_create_namespaced_job.call_count == 1 @@ -125,3 +124,29 @@ def test_sync_flow(): assert mock_read_namespaced_job_status.call_count == 1 assert mock_delete_namespaced_job.call_count == 1 + + +async def test_run_namespaced_job_successful_with_evictions( + valid_kubernetes_job_block, + mock_create_namespaced_job, + mock_read_namespaced_job_status, + mock_delete_namespaced_job, + successful_job_status, + mock_list_namespaced_pod, + read_pod_logs, +): + successful_job_status.status.active = 0 + successful_job_status.status.failed = 1 + mock_read_namespaced_job_status.return_value = successful_job_status + + await run_namespaced_job(kubernetes_job=valid_kubernetes_job_block) + + assert mock_create_namespaced_job.call_count == 1 + assert mock_create_namespaced_job.call_args[1]["namespace"] == "default" + assert mock_create_namespaced_job.call_args[1]["body"].metadata.name == "pi" + + assert read_pod_logs.call_count == 1 + + assert mock_read_namespaced_job_status.call_count == 1 + + assert mock_delete_namespaced_job.call_count == 1