Skip to content

Commit

Permalink
list pods performance optimization (#36092)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirrao authored Dec 10, 2023
1 parent 8d0c5d9 commit b9c574c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 38 deletions.
31 changes: 22 additions & 9 deletions airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Sequence

from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update

from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError
Expand Down Expand Up @@ -160,19 +161,31 @@ def __init__(self):
super().__init__(parallelism=self.kube_config.parallelism)

def _list_pods(self, query_kwargs):
query_kwargs["header_params"] = {
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
}
dynamic_client = DynamicClient(self.kube_client.api_client)
pod_resource = dynamic_client.resources.get(api_version="v1", kind="Pod")
if self.kube_config.multi_namespace_mode:
if self.kube_config.multi_namespace_mode_namespace_list:
pods = []
for namespace in self.kube_config.multi_namespace_mode_namespace_list:
pods.extend(
self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
)
namespaces = self.kube_config.multi_namespace_mode_namespace_list
else:
pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
namespaces = [None]
else:
pods = self.kube_client.list_namespaced_pod(
namespace=self.kube_config.kube_namespace, **query_kwargs
).items
namespaces = [self.kube_config.kube_namespace]

pods = []
for namespace in namespaces:
# Dynamic Client list pods is throwing TypeError when there are no matching pods to return
# This bug was fixed in MR https://github.com/kubernetes-client/python/pull/2155
# TODO: Remove the try-except clause once we upgrade the K8 Python client version which
# includes the above MR
try:
pods.extend(
dynamic_client.get(resource=pod_resource, namespace=namespace, **query_kwargs).items
)
except TypeError:
continue

return pods

Expand Down
117 changes: 88 additions & 29 deletions tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,13 +721,16 @@ def test_change_state_failed_pod_deletion(
finally:
executor.end()

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_launched_task):
def test_try_adopt_task_instances(
self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client
):
executor = self.kubernetes_executor
executor.scheduler_job_id = "10"
ti_key = annotations_to_key(
Expand All @@ -741,22 +744,27 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", key=ti_key)
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo"))
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value.items = [pod]
executor.kube_client = mock_kube_client
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = [pod]

# First adoption
reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
mock_kube_dynamic_client.return_value.get.assert_called_once_with(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti})
mock_adopt_completed_pods.assert_called_once()
assert reset_tis == [mock_ti] # assume failure adopting when checking return

# Second adoption (queued_by_job_id and external_executor_id no longer match)
mock_kube_client.reset_mock()
mock_kube_dynamic_client.return_value.reset_mock()
mock_adopt_launched_task.reset_mock()
mock_adopt_completed_pods.reset_mock()

Expand All @@ -768,23 +776,31 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la
)

reset_tis = executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
mock_kube_dynamic_client.return_value.get.assert_called_once_with(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated
mock_adopt_completed_pods.assert_called_once()
assert reset_tis == [] # This time our return is empty - no TIs to reset

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_completed_pods):
def test_try_adopt_task_instances_multiple_scheduler_ids(
self, mock_adopt_completed_pods, mock_kube_dynamic_client
):
"""We try to find pods only once per scheduler id"""
executor = self.kubernetes_executor
mock_kube_client = mock.MagicMock()
executor.kube_client = mock_kube_client
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource

mock_tis = [
mock.MagicMock(queued_by_job_id="10", external_executor_id="1", dag_id="dag", task_id="task"),
Expand All @@ -793,37 +809,47 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_comple
]

executor.try_adopt_task_instances(mock_tis)
assert mock_kube_client.list_namespaced_pod.call_count == 2
mock_kube_client.list_namespaced_pod.assert_has_calls(
assert mock_kube_dynamic_client.return_value.get.call_count == 2
mock_kube_dynamic_client.return_value.get.assert_has_calls(
[
mock.call(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
header_params={
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
},
),
mock.call(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
header_params={
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
},
),
],
any_order=True,
)

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
def test_try_adopt_task_instances_no_matching_pods(
self, mock_adopt_completed_pods, mock_adopt_launched_task
self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client
):
executor = self.kubernetes_executor
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value.items = []
executor.kube_client = mock_kube_client
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_kube_dynamic_client.return_value.get.return_value.items = []

tis_to_flush = executor.try_adopt_task_instances([mock_ti])
assert tis_to_flush == [mock_ti]
Expand Down Expand Up @@ -880,12 +906,17 @@ def test_adopt_launched_task_api_exception(self, mock_kube_client):
assert tis_to_flush_by_key == {ti_key: {}}
assert executor.running == set()

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_completed_pods(self, mock_kube_client):
def test_adopt_completed_pods(self, mock_kube_client, mock_kube_dynamic_client):
"""We should adopt all completed pods from other schedulers"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]

Expand All @@ -897,7 +928,7 @@ def get_annotations(pod_name):
"try_number": "1",
}

mock_kube_client.list_namespaced_pod.return_value.items = [
mock_kube_dynamic_client.return_value.get.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
Expand All @@ -911,10 +942,12 @@ def get_annotations(pod_name):
expected_running_ti_keys = {annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names}

executor._adopt_completed_pods(mock_kube_client)
mock_kube_client.list_namespaced_pod.assert_called_once_with(
mock_kube_dynamic_client.return_value.get.assert_called_once_with(
resource=mock_pod_resource,
namespace="somens",
field_selector="status.phase=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
mock_kube_client.patch_namespaced_pod.assert_has_calls(
Expand Down Expand Up @@ -1002,10 +1035,16 @@ def test_kube_config_get_namespace_list(
assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config

@pytest.mark.db_test
def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_dummy_dag, session):
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
def test_clear_not_launched_queued_tasks_not_launched(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""If a pod isn't found for a TI, reset the state to scheduled"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()
Expand All @@ -1022,9 +1061,12 @@ def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_du

ti.refresh_from_db()
assert ti.state == State.SCHEDULED
assert mock_kube_client.list_namespaced_pod.call_count == 1
mock_kube_client.list_namespaced_pod.assert_called_with(
namespace="default", label_selector="airflow-worker=1"
assert mock_kube_dynamic_client.return_value.get.call_count == 1
mock_kube_dynamic_client.return_value.get.assert_called_with(
resource=mock_pod_resource,
namespace="default",
label_selector="airflow-worker=1",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)

@pytest.mark.db_test
Expand All @@ -1036,12 +1078,16 @@ def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_du
pytest.param("kubernetes", "kubernetes"),
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
def test_clear_not_launched_queued_tasks_launched(
self, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue
):
"""Leave the state alone if a pod already exists"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
items=[
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
Expand Down Expand Up @@ -1075,15 +1121,19 @@ def test_clear_not_launched_queued_tasks_launched(

ti.refresh_from_db()
assert ti.state == State.QUEUED
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace="default", label_selector="airflow-worker=1"
mock_kube_dynamic_client.return_value.get.assert_called_once_with(
resource=mock_pod_resource,
namespace="default",
label_selector="airflow-worker=1",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)

@pytest.mark.db_test
def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker, session):
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
def test_clear_not_launched_queued_tasks_mapped_task(self, mock_kube_dynamic_client, dag_maker, session):
"""One mapped task has a launched pod - other does not."""

def list_namespaced_pod(*args, **kwargs):
def get(*args, **kwargs):
return k8s.V1PodList(
items=[
k8s.V1Pod(
Expand All @@ -1103,7 +1153,10 @@ def list_namespaced_pod(*args, **kwargs):
)

mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.side_effect = get

with dag_maker(dag_id="test_clear"):
op = BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
Expand All @@ -1129,10 +1182,12 @@ def list_namespaced_pod(*args, **kwargs):
assert ti0.state == State.QUEUED
assert ti1.state == State.SCHEDULED

assert mock_kube_client.list_namespaced_pod.call_count == 1
mock_kube_client.list_namespaced_pod.assert_called_with(
assert mock_kube_dynamic_client.return_value.get.call_count == 1
mock_kube_dynamic_client.return_value.get.assert_called_with(
resource=mock_pod_resource,
namespace="default",
label_selector="airflow-worker=1",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)

@pytest.mark.db_test
Expand Down Expand Up @@ -1163,10 +1218,14 @@ def test_clear_not_launched_queued_tasks_not_launched_other_queue(
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
def test_clear_not_launched_queued_tasks_clear_only_by_job_id(self, dag_maker, create_dummy_dag, session):
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""clear only not launched queued tasks which are queued by the same executor job"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(items=[])

create_dummy_dag(dag_id="test_clear_0", task_id="task0", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()
Expand Down

0 comments on commit b9c574c

Please sign in to comment.