Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add handling for rescheduled Kubernetes jobs (#78)
Browse files Browse the repository at this point in the history
* Add handling for rescheduled jobs

* Adds test for backoff limit checking

* Adds test for missing pod

* Adds changelog entry

* Add default backoffLimit to ensure jobs are run only once by default

* Updates changelog for release

* Updates tests
  • Loading branch information
desertaxle authored Jun 30, 2023
1 parent b565161 commit d9ce16e
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 10 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## 0.2.10

Released June 30th, 2023.

### Added

- Handling for rescheduled Kubernetes jobs - [#78](https://github.com/PrefectHQ/prefect-kubernetes/pull/78)

## 0.2.9

Released June 20th, 2023.
Expand Down
44 changes: 40 additions & 4 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _get_default_job_manifest_template() -> Dict[str, Any]:
"generateName": "{{ name }}-",
},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}",
"template": {
"spec": {
Expand Down Expand Up @@ -808,19 +809,54 @@ def _watch_job(
namespace=configuration.namespace,
**timeout_seconds,
):
if event["object"].status.completion_time:
if event["type"] == "DELETED":
logger.error(f"Job {job_name!r}: Job has been deleted.")
completed = True
elif event["object"].status.completion_time:
if not event["object"].status.succeeded:
# Job failed, exit while loop and return pod exit code
logger.error(f"Job {job_name!r}: Job failed.")
completed = True
# Check if the job has reached its backoff limit
# and stop watching if it has
elif (
event["object"].spec.backoff_limit is not None
and event["object"].status.failed is not None
and event["object"].status.failed
> event["object"].spec.backoff_limit
):
logger.error(f"Job {job_name!r}: Job reached backoff limit.")
completed = True
# If the job has no backoff limit, check if it has failed
# and stop watching if it has
elif (
not event["object"].spec.backoff_limit
and event["object"].status.failed
):
completed = True

if completed:
watch.stop()
break

with self._get_core_client(client) as core_client:
pod_status = core_client.read_namespaced_pod_status(
namespace=configuration.namespace, name=pod.metadata.name
# Get all pods for the job
pods = core_client.list_namespaced_pod(
namespace=configuration.namespace, label_selector=f"job-name={job_name}"
)
# Get the status for only the most recently used pod
pods.items.sort(
key=lambda pod: pod.metadata.creation_timestamp, reverse=True
)
most_recent_pod = pods.items[0] if pods.items else None
first_container_status = (
most_recent_pod.status.container_statuses[0]
if most_recent_pod
else None
)
first_container_status = pod_status.status.container_statuses[0]
if not first_container_status:
logger.error(f"Job {job_name!r}: No pods found for job.")
return -1

return first_container_status.state.terminated.exit_code

Expand Down
96 changes: 90 additions & 6 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
"labels": {},
},
"spec": {
"backoffLimit": 0,
"template": {
"spec": {
"parallelism": 1,
Expand All @@ -171,7 +172,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
}
],
}
}
},
},
},
cluster_config=None,
Expand Down Expand Up @@ -213,6 +214,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
},
},
"spec": {
"backoffLimit": 0,
"template": {
"spec": {
"parallelism": 1,
Expand Down Expand Up @@ -241,7 +243,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
}
],
}
}
},
},
},
cluster_config=None,
Expand Down Expand Up @@ -290,6 +292,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
"generateName": "test-",
},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": 60,
"template": {
"spec": {
Expand Down Expand Up @@ -354,6 +357,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
},
},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": 60,
"template": {
"spec": {
Expand Down Expand Up @@ -935,7 +939,10 @@ async def test_creates_job_by_building_a_manifest(

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
await k8s_worker.run(flow_run=flow_run, configuration=default_configuration)
mock_core_client.read_namespaced_pod_status.assert_called_once()
mock_core_client.list_namespaced_pod.assert_called_with(
namespace=default_configuration.namespace,
label_selector="job-name=mock-job",
)

mock_batch_client.create_namespaced_job.assert_called_with(
"default",
Expand Down Expand Up @@ -1768,7 +1775,9 @@ def mock_stream(*args, **kwargs):

# Yield the completed job
job.status.completion_time = True
yield {"object": job}
job.status.failed = 0
job.spec.backoff_limit = 6
yield {"object": job, "type": "ADDED"}

def mock_log_stream(*args, **kwargs):
anyio.sleep(500)
Expand Down Expand Up @@ -1817,7 +1826,7 @@ def mock_stream(*args, **kwargs):
if kwargs["func"] == mock_core_client.list_namespaced_pod:
job_pod = MagicMock(spec=kubernetes.client.V1Pod)
job_pod.status.phase = "Running"
yield {"object": job_pod}
yield {"object": job_pod, "type": "ADDED"}

if kwargs["func"] == mock_batch_client.list_namespaced_job:
job = MagicMock(spec=kubernetes.client.V1Job)
Expand Down Expand Up @@ -1947,7 +1956,9 @@ def mock_stream(*args, **kwargs):

# Yield the job then return exiting the stream
job.status.completion_time = None
yield {"object": job}
job.status.failed = 0
job.spec.backoff_limit = 6
yield {"object": job, "type": "ADDED"}

mock_watch.stream.side_effect = mock_stream
default_configuration.job_watch_timeout_seconds = 40
Expand Down Expand Up @@ -1993,6 +2004,79 @@ def mock_stream(*args, **kwargs):
]
)

async def test_watch_stops_after_backoff_limit_reached(
self,
flow_run,
default_configuration,
mock_core_client,
mock_watch,
mock_batch_client,
):
# The job should not be completed to start
mock_batch_client.read_namespaced_job.return_value.status.completion_time = None
job_pod = MagicMock(spec=kubernetes.client.V1Pod)
job_pod.status.phase = "Running"
mock_container_status = MagicMock(spec=kubernetes.client.V1ContainerStatus)
mock_container_status.state.terminated.exit_code = 137
job_pod.status.container_statuses = [mock_container_status]
mock_core_client.list_namespaced_pod.return_value.items = [job_pod]

def mock_stream(*args, **kwargs):
if kwargs["func"] == mock_core_client.list_namespaced_pod:
yield {"object": job_pod}

if kwargs["func"] == mock_batch_client.list_namespaced_job:
job = MagicMock(spec=kubernetes.client.V1Job)

# Yield the job then return exiting the stream
job.status.completion_time = None
job.spec.backoff_limit = 6
for i in range(0, 8):
job.status.failed = i
yield {"object": job, "type": "ADDED"}

mock_watch.stream.side_effect = mock_stream

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
result = await k8s_worker.run(flow_run, default_configuration)

assert result.status_code == 137

async def test_watch_handles_no_pod(
self,
flow_run,
default_configuration,
mock_core_client,
mock_watch,
mock_batch_client,
):
# The job should not be completed to start
mock_batch_client.read_namespaced_job.return_value.status.completion_time = None
mock_core_client.list_namespaced_pod.return_value.items = []

def mock_stream(*args, **kwargs):
if kwargs["func"] == mock_core_client.list_namespaced_pod:
job_pod = MagicMock(spec=kubernetes.client.V1Pod)
job_pod.status.phase = "Running"
yield {"object": job_pod}

if kwargs["func"] == mock_batch_client.list_namespaced_job:
job = MagicMock(spec=kubernetes.client.V1Job)

# Yield the job then return exiting the stream
job.status.completion_time = None
job.spec.backoff_limit = 6
for i in range(0, 8):
job.status.failed = i
yield {"object": job, "type": "ADDED"}

mock_watch.stream.side_effect = mock_stream

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
result = await k8s_worker.run(flow_run, default_configuration)

assert result.status_code == -1

class TestKillInfrastructure:
async def test_kill_infrastructure_calls_delete_namespaced_job(
self,
Expand Down

0 comments on commit d9ce16e

Please sign in to comment.