diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 482f99725b58f..0f98f2f7dcb46 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -38,7 +38,7 @@ from deprecated import deprecated from kubernetes.dynamic import DynamicClient -from sqlalchemy import or_, select, update +from sqlalchemy import select try: from airflow.cli.cli_config import ARG_LOGICAL_DATE @@ -60,7 +60,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor -from airflow.executors.executor_constants import KUBERNETES_EXECUTOR from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException, PodReconciliationError from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, @@ -69,7 +68,6 @@ from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key from airflow.stats import Stats -from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import remove_escape_codes from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState @@ -145,7 +143,6 @@ def __init__(self): self.kube_scheduler: AirflowKubernetesScheduler | None = None self.kube_client: client.CoreV1Api | None = None self.scheduler_job_id: str | None = None - self.event_scheduler: EventScheduler | None = None self.last_handled: dict[TaskInstanceKey, float] = {} self.kubernetes_queue: str | None = None self.task_publish_retries: Counter[TaskInstanceKey] = Counter() @@ -218,96 +215,6 @@ def get_pod_combined_search_str_to_pod_map(self) -> dict[str, k8s.V1Pod]: pod_combined_search_str_to_pod_map[search_str] = pod return pod_combined_search_str_to_pod_map - @provide_session - def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> None: - """ - Clear tasks that were not yet launched, but were previously queued. - - Tasks can end up in a "Queued" state when a rescheduled/deferred operator - comes back up for execution (with the same try_number) before the - pod of its previous incarnation has been fully removed (we think). - - It's also possible when an executor abruptly shuts down (leaving a non-empty - task_queue on that executor), but that scenario is handled via normal adoption. - - This method checks each of our queued tasks to see if the corresponding pod - is around, and if not, and there's no matching entry in our own - task_queue, marks it for re-execution. - """ - if TYPE_CHECKING: - assert self.kube_client - from airflow.models.taskinstance import TaskInstance - - hybrid_executor_enabled = hasattr(TaskInstance, "executor") - default_executor_alias = None - if hybrid_executor_enabled: - from airflow.executors.executor_loader import ExecutorLoader - - default_executor_name = ExecutorLoader.get_default_executor_name() - default_executor_alias = default_executor_name.alias - - with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"): - self.log.debug("Clearing tasks that have not been launched") - query = select(TaskInstance).where( - TaskInstance.state == TaskInstanceState.QUEUED, - TaskInstance.queued_by_job_id == self.job_id, - ) - if self.kubernetes_queue: - query = query.where(TaskInstance.queue == self.kubernetes_queue) - # KUBERNETES_EXECUTOR is the string name/alias of the "core" executor represented by this - # module. The ExecutorName for "core" executors always contains an alias and cannot be modified - # to be different from the constant (in this case KUBERNETES_EXECUTOR). - elif hybrid_executor_enabled and default_executor_alias == KUBERNETES_EXECUTOR: - query = query.where( - or_( - TaskInstance.executor == KUBERNETES_EXECUTOR, - TaskInstance.executor.is_(None), - ), - ) - elif hybrid_executor_enabled: - query = query.where(TaskInstance.executor == KUBERNETES_EXECUTOR) - queued_tis: list[TaskInstance] = session.scalars(query).all() - self.log.info("Found %s queued task instances", len(queued_tis)) - - # Go through the "last seen" dictionary and clean out old entries - allowed_age = self.kube_config.worker_pods_queued_check_interval * 3 - for key, timestamp in list(self.last_handled.items()): - if time.time() - timestamp > allowed_age: - del self.last_handled[key] - - if not queued_tis: - return - - pod_combined_search_str_to_pod_map = self.get_pod_combined_search_str_to_pod_map() - - for ti in queued_tis: - self.log.debug("Checking task instance %s", ti) - - # Check to see if we've handled it ourselves recently - if ti.key in self.last_handled: - continue - - # Build the pod selector - base_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}" - if ti.map_index >= 0: - # Old tasks _couldn't_ be mapped, so we don't have to worry about compat - base_selector += f",map_index={ti.map_index}" - - search_str = f"{base_selector},run_id={ti.run_id}" - if search_str in pod_combined_search_str_to_pod_map: - continue - self.log.info("TaskInstance: %s found in queued state but was not launched, rescheduling", ti) - session.execute( - update(TaskInstance) - .where( - TaskInstance.dag_id == ti.dag_id, - TaskInstance.task_id == ti.task_id, - TaskInstance.run_id == ti.run_id, - TaskInstance.map_index == ti.map_index, - ) - .values(state=TaskInstanceState.SCHEDULED) - ) - def start(self) -> None: """Start the executor.""" self.log.info("Start Kubernetes executor") @@ -325,15 +232,6 @@ def start(self) -> None: kube_client=self.kube_client, scheduler_job_id=self.scheduler_job_id, ) - self.event_scheduler = EventScheduler() - - self.event_scheduler.call_regular_interval( - self.kube_config.worker_pods_queued_check_interval, - self.clear_not_launched_queued_tasks, - ) - # We also call this at startup as that's the most likely time to see - # stuck queued tasks - self.clear_not_launched_queued_tasks() def execute_async( self, @@ -378,7 +276,6 @@ def sync(self) -> None: assert self.kube_config assert self.result_queue assert self.task_queue - assert self.event_scheduler if self.running: self.log.debug("self.running: %s", self.running) @@ -466,10 +363,6 @@ def sync(self) -> None: finally: self.task_queue.task_done() - # Run any pending timed events - next_event = self.event_scheduler.run(blocking=False) - self.log.debug("Next timed event is in %f", next_event) - @provide_session def _change_state( self, diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index f21c2866e834b..702703b2142e0 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -231,12 +231,8 @@ def process_status( ) elif status == "Pending": # deletion_timestamp is set by kube server when a graceful deletion is requested. - # since kube server have received request to delete pod set TI state failed if event["type"] == "DELETED" and pod.metadata.deletion_timestamp: self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string) - self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) - ) elif ( self.kube_config.worker_pod_pending_fatal_container_state_reasons and "status" in event["raw_object"] diff --git a/providers/src/airflow/providers/cncf/kubernetes/kube_config.py b/providers/src/airflow/providers/cncf/kubernetes/kube_config.py index 7a1de52928a00..3f7ecee327782 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/kube_config.py +++ b/providers/src/airflow/providers/cncf/kubernetes/kube_config.py @@ -76,10 +76,6 @@ def __init__(self): # interact with cluster components. self.executor_namespace = conf.get(self.kubernetes_section, "namespace") - self.worker_pods_queued_check_interval = conf.getint( - self.kubernetes_section, "worker_pods_queued_check_interval" - ) - self.kube_client_request_args = conf.getjson( self.kubernetes_section, "kube_client_request_args", fallback={} ) diff --git a/providers/src/airflow/providers/cncf/kubernetes/provider.yaml b/providers/src/airflow/providers/cncf/kubernetes/provider.yaml index 631e502c2e3ca..9d38a70aa14b9 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/provider.yaml +++ b/providers/src/airflow/providers/cncf/kubernetes/provider.yaml @@ -368,13 +368,6 @@ config: type: boolean example: ~ default: "True" - worker_pods_queued_check_interval: - description: | - How often in seconds to check for task instances stuck in "queued" status without a pod - version_added: ~ - type: integer - example: ~ - default: "60" ssl_ca_cert: description: | Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate. diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index cbca9f6e30bc1..2a600d6fc575b 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -30,12 +30,6 @@ from airflow import __version__ from airflow.exceptions import AirflowException -from airflow.executors.executor_constants import ( - CELERY_EXECUTOR, - CELERY_KUBERNETES_EXECUTOR, - KUBERNETES_EXECUTOR, -) -from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstancekey import TaskInstanceKey from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes import pod_generator @@ -62,7 +56,6 @@ from airflow.utils import timezone from airflow.utils.state import State, TaskInstanceState -from tests_common.test_utils.compat import BashOperator from tests_common.test_utils.config import conf_vars if __version__.startswith("2."): @@ -1347,443 +1340,6 @@ def test_kube_config_get_namespace_list( assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config - @pytest.mark.db_test - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_not_launched( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session - ): - """If a pod isn't found for a TI, reset the state to scheduled""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.return_value.items = [] - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.state == State.SCHEDULED - assert mock_kube_dynamic_client.return_value.get.call_count == 1 - mock_kube_dynamic_client.return_value.get.assert_called_with( - resource=mock_pod_resource, - namespace="default", - label_selector="airflow-worker=1", - header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, - ) - - @pytest.mark.db_test - @pytest.mark.parametrize( - "task_queue, kubernetes_queue", - [ - pytest.param("default", None), - pytest.param("kubernetes", None), - pytest.param("kubernetes", "kubernetes"), - ], - ) - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_launched( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue - ): - """Leave the state alone if a pod already exists""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList( - items=[ - k8s.V1Pod( - metadata=k8s.V1ObjectMeta( - annotations={ - "dag_id": "test_clear", - "task_id": "task1", - "run_id": "test", - }, - labels={ - "role": "airflow-worker", - "dag_id": "test_clear", - "task_id": "task1", - "airflow-worker": 1, - "run_id": "test", - }, - ), - status=k8s.V1PodStatus(phase="Pending"), - ) - ] - ) - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - ti.queue = task_queue - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - executor.kubernetes_queue = kubernetes_queue - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.state == State.QUEUED - mock_kube_dynamic_client.return_value.get.assert_called_once_with( - resource=mock_pod_resource, - namespace="default", - label_selector="airflow-worker=1", - header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, - ) - - @pytest.mark.db_test - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_mapped_task(self, mock_kube_dynamic_client, dag_maker, session): - """One mapped task has a launched pod - other does not.""" - - def get(*args, **kwargs): - return k8s.V1PodList( - items=[ - k8s.V1Pod( - metadata=k8s.V1ObjectMeta( - annotations={ - "dag_id": "test_clear", - "task_id": "bash", - "run_id": "test", - "map_index": 0, - }, - labels={ - "role": "airflow-worker", - "dag_id": "test_clear", - "task_id": "bash", - "airflow-worker": 1, - "map_index": 0, - "run_id": "test", - }, - ), - status=k8s.V1PodStatus(phase="Pending"), - ) - ] - ) - - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.side_effect = get - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - with dag_maker(dag_id="test_clear"): - op = BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"]) - - dag_run = dag_maker.create_dagrun() - ti0 = dag_run.get_task_instance(op.task_id, session, map_index=0) - ti0.state = State.QUEUED - ti0.queued_by_job_id = 1 - - ti1 = dag_run.get_task_instance(op.task_id, session, map_index=1) - ti1.state = State.QUEUED - ti1.queued_by_job_id = 1 - - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti0.refresh_from_db() - ti1.refresh_from_db() - assert ti0.state == State.QUEUED - assert ti1.state == State.SCHEDULED - - assert mock_kube_dynamic_client.return_value.get.call_count == 1 - mock_kube_dynamic_client.return_value.get.assert_called_with( - resource=mock_pod_resource, - namespace="default", - label_selector="airflow-worker=1", - header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, - ) - - @pytest.mark.db_test - @conf_vars({("core", "executor"): CELERY_KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_not_launched_other_queue( - self, dag_maker, create_dummy_dag, session - ): - """Queued TI has no pod, but it is not queued for the k8s executor""" - mock_kube_client = mock.MagicMock() - mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[]) - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - executor.kubernetes_queue = "kubernetes" - - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.state == State.QUEUED - assert mock_kube_client.list_namespaced_pod.call_count == 0 - - @pytest.mark.db_test - @pytest.mark.skipif( - not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version" - ) - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_not_launched_other_executor( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session - ): - """Queued TI has no pod, but it is not queued for the k8s executor""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.return_value.items = [] - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - ti.executor = "CeleryExecutor" - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.executor == "CeleryExecutor" - assert ti.state == State.QUEUED - assert mock_kube_client.list_namespaced_pod.call_count == 0 - - @pytest.mark.db_test - @pytest.mark.skipif( - not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version" - ) - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): CELERY_EXECUTOR}) - def test_clear_not_launched_queued_tasks_not_launched_other_default_executor( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session - ): - """Queued TI has no pod, but it is not queued for the k8s executor""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.return_value.items = [] - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.state == State.QUEUED - assert mock_kube_client.list_namespaced_pod.call_count == 0 - - @pytest.mark.db_test - @pytest.mark.skipif( - not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version" - ) - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_launched_none_executor( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session - ): - """Queued TI has no pod, but it is not queued for the k8s executor""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.return_value.items = [] - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.state == State.SCHEDULED - assert mock_kube_dynamic_client.return_value.get.call_count == 1 - - @pytest.mark.db_test - @pytest.mark.skipif( - not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version" - ) - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_launched_kubernetes_executor( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session - ): - """Queued TI has no pod, but it is not queued for the k8s executor""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_pod_resource = mock.MagicMock() - mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource - mock_kube_dynamic_client.return_value.get.return_value.items = [] - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti = dag_run.task_instances[0] - ti.state = State.QUEUED - ti.queued_by_job_id = 1 - ti.executor = KUBERNETES_EXECUTOR - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti.refresh_from_db() - assert ti.state == State.SCHEDULED - assert mock_kube_dynamic_client.return_value.get.call_count == 1 - - @pytest.mark.db_test - @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") - @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR}) - def test_clear_not_launched_queued_tasks_clear_only_by_job_id( - self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session - ): - """clear only not launched queued tasks which are queued by the same executor job""" - mock_kube_client = mock.MagicMock() - mock_kube_dynamic_client.return_value = mock.MagicMock() - mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(items=[]) - - # This is hack to use overridden conf vars as it seems executors loaded before conf override. - if hasattr(TaskInstance, "executor"): - import importlib - - from airflow.executors import executor_loader - - importlib.reload(executor_loader) - create_dummy_dag(dag_id="test_clear_0", task_id="task0", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti0 = dag_run.task_instances[0] - ti0.state = State.QUEUED - ti0.queued_by_job_id = 1 - session.flush() - - create_dummy_dag(dag_id="test_clear_1", task_id="task1", with_dagrun_type=None) - dag_run = dag_maker.create_dagrun() - - ti1 = dag_run.task_instances[0] - ti1.state = State.QUEUED - ti1.queued_by_job_id = 2 - session.flush() - - executor = self.kubernetes_executor - executor.job_id = 1 - executor.kube_client = mock_kube_client - executor.clear_not_launched_queued_tasks(session=session) - - ti0.refresh_from_db() - ti1.refresh_from_db() - assert ti0.state == State.SCHEDULED - assert ti1.state == State.QUEUED - @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operator): @@ -2125,7 +1681,7 @@ def test_process_status_pending_deleted(self): self.pod.metadata.deletion_timestamp = timezone.utcnow() self._run() - self.assert_watcher_queue_called_once_with_state(State.FAILED) + self.watcher.watcher_queue.put.assert_not_called() def test_process_status_failed(self): self.pod.status.phase = "Failed"