From b9c574c61ae42481b9d2c9ce7c42c93dc44b9507 Mon Sep 17 00:00:00 2001 From: Gopal Dirisala <39794726+dirrao@users.noreply.github.com> Date: Sun, 10 Dec 2023 17:19:39 +0530 Subject: [PATCH] list pods performance optimization (#36092) --- .../executors/kubernetes_executor.py | 31 +++-- .../executors/test_kubernetes_executor.py | 117 +++++++++++++----- 2 files changed, 110 insertions(+), 38 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 16923d63e2912..6e32c0047330f 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -34,6 +34,7 @@ from queue import Empty, Queue from typing import TYPE_CHECKING, Any, Sequence +from kubernetes.dynamic import DynamicClient from sqlalchemy import select, update from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError @@ -160,19 +161,31 @@ def __init__(self): super().__init__(parallelism=self.kube_config.parallelism) def _list_pods(self, query_kwargs): + query_kwargs["header_params"] = { + "Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io" + } + dynamic_client = DynamicClient(self.kube_client.api_client) + pod_resource = dynamic_client.resources.get(api_version="v1", kind="Pod") if self.kube_config.multi_namespace_mode: if self.kube_config.multi_namespace_mode_namespace_list: - pods = [] - for namespace in self.kube_config.multi_namespace_mode_namespace_list: - pods.extend( - self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items - ) + namespaces = self.kube_config.multi_namespace_mode_namespace_list else: - pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items + namespaces = [None] else: - pods = self.kube_client.list_namespaced_pod( - namespace=self.kube_config.kube_namespace, **query_kwargs - ).items + namespaces = [self.kube_config.kube_namespace] + + pods = [] + for namespace in namespaces: + # Dynamic Client list pods is throwing TypeError when there are no matching pods to return + # This bug was fixed in MR https://github.com/kubernetes-client/python/pull/2155 + # TODO: Remove the try-except clause once we upgrade the K8 Python client version which + # includes the above MR + try: + pods.extend( + dynamic_client.get(resource=pod_resource, namespace=namespace, **query_kwargs).items + ) + except TypeError: + continue return pods diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py index 35584a4d8263c..a0b187087ad30 100644 --- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -721,13 +721,16 @@ def test_change_state_failed_pod_deletion( finally: executor.end() + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task" ) @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods" ) - def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_launched_task): + def test_try_adopt_task_instances( + self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client + ): executor = self.kubernetes_executor executor.scheduler_job_id = "10" ti_key = annotations_to_key( @@ -741,22 +744,27 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", key=ti_key) pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo")) mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.return_value.items = [pod] executor.kube_client = mock_kube_client + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.return_value.items = [pod] # First adoption reset_tis = executor.try_adopt_task_instances([mock_ti]) - mock_kube_client.list_namespaced_pod.assert_called_once_with( + mock_kube_dynamic_client.return_value.get.assert_called_once_with( + resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True", + header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti}) mock_adopt_completed_pods.assert_called_once() assert reset_tis == [mock_ti] # assume failure adopting when checking return # Second adoption (queued_by_job_id and external_executor_id no longer match) - mock_kube_client.reset_mock() + mock_kube_dynamic_client.return_value.reset_mock() mock_adopt_launched_task.reset_mock() mock_adopt_completed_pods.reset_mock() @@ -768,23 +776,31 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la ) reset_tis = executor.try_adopt_task_instances([mock_ti]) - mock_kube_client.list_namespaced_pod.assert_called_once_with( + mock_kube_dynamic_client.return_value.get.assert_called_once_with( + resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True", + header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated mock_adopt_completed_pods.assert_called_once() assert reset_tis == [] # This time our return is empty - no TIs to reset + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods" ) - def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_completed_pods): + def test_try_adopt_task_instances_multiple_scheduler_ids( + self, mock_adopt_completed_pods, mock_kube_dynamic_client + ): """We try to find pods only once per scheduler id""" executor = self.kubernetes_executor mock_kube_client = mock.MagicMock() executor.kube_client = mock_kube_client + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource mock_tis = [ mock.MagicMock(queued_by_job_id="10", external_executor_id="1", dag_id="dag", task_id="task"), @@ -793,23 +809,32 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_comple ] executor.try_adopt_task_instances(mock_tis) - assert mock_kube_client.list_namespaced_pod.call_count == 2 - mock_kube_client.list_namespaced_pod.assert_has_calls( + assert mock_kube_dynamic_client.return_value.get.call_count == 2 + mock_kube_dynamic_client.return_value.get.assert_has_calls( [ mock.call( + resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True", + header_params={ + "Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io" + }, ), mock.call( + resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True", + header_params={ + "Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io" + }, ), ], any_order=True, ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch( "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task" ) @@ -817,13 +842,14 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_comple "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods" ) def test_try_adopt_task_instances_no_matching_pods( - self, mock_adopt_completed_pods, mock_adopt_launched_task + self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client ): executor = self.kubernetes_executor mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task") mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.return_value.items = [] executor.kube_client = mock_kube_client + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_kube_dynamic_client.return_value.get.return_value.items = [] tis_to_flush = executor.try_adopt_task_instances([mock_ti]) assert tis_to_flush == [mock_ti] @@ -880,12 +906,17 @@ def test_adopt_launched_task_api_exception(self, mock_kube_client): assert tis_to_flush_by_key == {ti_key: {}} assert executor.running == set() + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") - def test_adopt_completed_pods(self, mock_kube_client): + def test_adopt_completed_pods(self, mock_kube_client, mock_kube_dynamic_client): """We should adopt all completed pods from other schedulers""" executor = self.kubernetes_executor executor.scheduler_job_id = "modified" executor.kube_client = mock_kube_client + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.return_value.items = [] executor.kube_config.kube_namespace = "somens" pod_names = ["one", "two"] @@ -897,7 +928,7 @@ def get_annotations(pod_name): "try_number": "1", } - mock_kube_client.list_namespaced_pod.return_value.items = [ + mock_kube_dynamic_client.return_value.get.return_value.items = [ k8s.V1Pod( metadata=k8s.V1ObjectMeta( name=pod_name, @@ -911,10 +942,12 @@ def get_annotations(pod_name): expected_running_ti_keys = {annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names} executor._adopt_completed_pods(mock_kube_client) - mock_kube_client.list_namespaced_pod.assert_called_once_with( + mock_kube_dynamic_client.return_value.get.assert_called_once_with( + resource=mock_pod_resource, namespace="somens", field_selector="status.phase=Succeeded", label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True", + header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count mock_kube_client.patch_namespaced_pod.assert_has_calls( @@ -1002,10 +1035,16 @@ def test_kube_config_get_namespace_list( assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config @pytest.mark.db_test - def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_dummy_dag, session): + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") + def test_clear_not_launched_queued_tasks_not_launched( + self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session + ): """If a pod isn't found for a TI, reset the state to scheduled""" mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[]) + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.return_value.items = [] create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) dag_run = dag_maker.create_dagrun() @@ -1022,9 +1061,12 @@ def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_du ti.refresh_from_db() assert ti.state == State.SCHEDULED - assert mock_kube_client.list_namespaced_pod.call_count == 1 - mock_kube_client.list_namespaced_pod.assert_called_with( - namespace="default", label_selector="airflow-worker=1" + assert mock_kube_dynamic_client.return_value.get.call_count == 1 + mock_kube_dynamic_client.return_value.get.assert_called_with( + resource=mock_pod_resource, + namespace="default", + label_selector="airflow-worker=1", + header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) @pytest.mark.db_test @@ -1036,12 +1078,16 @@ def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_du pytest.param("kubernetes", "kubernetes"), ], ) + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") def test_clear_not_launched_queued_tasks_launched( - self, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue + self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue ): """Leave the state alone if a pod already exists""" mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList( + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList( items=[ k8s.V1Pod( metadata=k8s.V1ObjectMeta( @@ -1075,15 +1121,19 @@ def test_clear_not_launched_queued_tasks_launched( ti.refresh_from_db() assert ti.state == State.QUEUED - mock_kube_client.list_namespaced_pod.assert_called_once_with( - namespace="default", label_selector="airflow-worker=1" + mock_kube_dynamic_client.return_value.get.assert_called_once_with( + resource=mock_pod_resource, + namespace="default", + label_selector="airflow-worker=1", + header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) @pytest.mark.db_test - def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker, session): + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") + def test_clear_not_launched_queued_tasks_mapped_task(self, mock_kube_dynamic_client, dag_maker, session): """One mapped task has a launched pod - other does not.""" - def list_namespaced_pod(*args, **kwargs): + def get(*args, **kwargs): return k8s.V1PodList( items=[ k8s.V1Pod( @@ -1103,7 +1153,10 @@ def list_namespaced_pod(*args, **kwargs): ) mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_pod_resource = mock.MagicMock() + mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource + mock_kube_dynamic_client.return_value.get.side_effect = get with dag_maker(dag_id="test_clear"): op = BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"]) @@ -1129,10 +1182,12 @@ def list_namespaced_pod(*args, **kwargs): assert ti0.state == State.QUEUED assert ti1.state == State.SCHEDULED - assert mock_kube_client.list_namespaced_pod.call_count == 1 - mock_kube_client.list_namespaced_pod.assert_called_with( + assert mock_kube_dynamic_client.return_value.get.call_count == 1 + mock_kube_dynamic_client.return_value.get.assert_called_with( + resource=mock_pod_resource, namespace="default", label_selector="airflow-worker=1", + header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) @pytest.mark.db_test @@ -1163,10 +1218,14 @@ def test_clear_not_launched_queued_tasks_not_launched_other_queue( assert mock_kube_client.list_namespaced_pod.call_count == 0 @pytest.mark.db_test - def test_clear_not_launched_queued_tasks_clear_only_by_job_id(self, dag_maker, create_dummy_dag, session): + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") + def test_clear_not_launched_queued_tasks_clear_only_by_job_id( + self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session + ): """clear only not launched queued tasks which are queued by the same executor job""" mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[]) + mock_kube_dynamic_client.return_value = mock.MagicMock() + mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(items=[]) create_dummy_dag(dag_id="test_clear_0", task_id="task0", with_dagrun_type=None) dag_run = dag_maker.create_dagrun()