diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 6437e3bf1ea2f..741bc08adae25 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -22,7 +22,6 @@ :ref:`executor:KubernetesExecutor` """ import base64 -import datetime import functools import json import multiprocessing @@ -49,18 +48,16 @@ from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance from airflow.models.taskinstance import TaskInstanceKey from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.session import create_session, provide_session +from airflow.utils.session import provide_session from airflow.utils.state import State -MAX_LABEL_LEN = 63 - # TaskInstance key, command, configuration KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any] # key, state, pod_id, namespace, resource_version KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, str] -# pod_id, namespace, state, labels, resource_version +# pod_id, namespace, state, annotations, resource_version KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str] @@ -359,11 +356,19 @@ def _run(self, ) if event['type'] == 'ERROR': return self.process_error(event) + annotations = task.metadata.annotations + task_instance_related_annotations = { + 'dag_id': annotations['dag_id'], + 'task_id': annotations['task_id'], + 'execution_date': annotations['execution_date'], + 'try_number': annotations['try_number'], + } + self.process_status( pod_id=task.metadata.name, namespace=task.metadata.namespace, status=task.status.phase, - labels=task.metadata.labels, + annotations=task_instance_related_annotations, resource_version=task.metadata.resource_version, event=event, ) @@ -393,28 +398,30 @@ def process_error(self, event: Any) -> str: def process_status(self, pod_id: str, namespace: str, status: str, - labels: Dict[str, str], + annotations: Dict[str, str], resource_version: str, event: Any) -> None: """Process status response""" if status == 'Pending': if event['type'] == 'DELETED': self.log.info('Event: Failed to start pod %s, will reschedule', pod_id) - self.watcher_queue.put((pod_id, namespace, State.UP_FOR_RESCHEDULE, labels, resource_version)) + self.watcher_queue.put( + (pod_id, namespace, State.UP_FOR_RESCHEDULE, annotations, resource_version) + ) else: self.log.info('Event: %s Pending', pod_id) elif status == 'Failed': self.log.error('Event: %s Failed', pod_id) - self.watcher_queue.put((pod_id, namespace, State.FAILED, labels, resource_version)) + self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version)) elif status == 'Succeeded': self.log.info('Event: %s Succeeded', pod_id) - self.watcher_queue.put((pod_id, namespace, None, labels, resource_version)) + self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version)) elif status == 'Running': self.log.info('Event: %s is Running', pod_id) else: self.log.warning( - 'Event: Invalid state: %s on pod: %s in namespace %s with labels: %s with ' - 'resource_version: %s', status, pod_id, namespace, labels, resource_version + 'Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with ' + 'resource_version: %s', status, pod_id, namespace, annotations, resource_version ) @@ -480,10 +487,10 @@ def run_next(self, next_job: KubernetesJobType) -> None: namespace=self.namespace, worker_uuid=self.worker_uuid, pod_id=self._create_pod_id(dag_id, task_id), - dag_id=pod_generator.make_safe_label_value(dag_id), - task_id=pod_generator.make_safe_label_value(task_id), + dag_id=dag_id, + task_id=task_id, try_number=try_number, - date=self._datetime_to_label_safe_datestring(execution_date), + date=execution_date, command=command, kube_executor_config=kube_executor_config, worker_config=self.worker_configuration_pod @@ -530,16 +537,24 @@ def sync(self) -> None: def process_watcher_task(self, task: KubernetesWatchType) -> None: """Process the task by watcher.""" - pod_id, namespace, state, labels, resource_version = task + pod_id, namespace, state, annotations, resource_version = task self.log.info( - 'Attempting to finish pod; pod_id: %s; state: %s; labels: %s', - pod_id, state, labels + 'Attempting to finish pod; pod_id: %s; state: %s; annotations: %s', + pod_id, state, annotations ) - key = self._labels_to_key(labels=labels) + key = self._annotations_to_key(annotations=annotations) if key: self.log.debug('finishing job %s - %s (%s)', key, state, pod_id) self.result_queue.put((key, state, pod_id, namespace, resource_version)) + def _annotations_to_key(self, annotations: Dict[str, str]) -> Optional[TaskInstanceKey]: + dag_id = annotations['dag_id'] + task_id = annotations['task_id'] + try_number = int(annotations['try_number']) + execution_date = parser.parse(annotations['execution_date']) + + return TaskInstanceKey(dag_id, task_id, execution_date, try_number) + @staticmethod def _strip_unsafe_kubernetes_special_chars(string: str) -> str: """ @@ -581,98 +596,6 @@ def _create_pod_id(dag_id: str, task_id: str) -> str: task_id) return safe_dag_id + safe_task_id - @staticmethod - def _label_safe_datestring_to_datetime(string: str) -> datetime.datetime: - """ - Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not - "_", let's - replace ":" with "_" - - :param string: str - :return: datetime.datetime object - """ - return parser.parse(string.replace('_plus_', '+').replace("_", ":")) - - @staticmethod - def _datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str: - """ - Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but - not "_" let's - replace ":" with "_" - - :param datetime_obj: datetime.datetime object - :return: ISO-like string representing the datetime - """ - return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_') - - def _labels_to_key(self, labels: Dict[str, str]) -> Optional[TaskInstanceKey]: - try_num = 1 - try: - try_num = int(labels.get('try_number', '1')) - except ValueError: - self.log.warning("could not get try_number as an int: %s", labels.get('try_number', '1')) - - try: - dag_id = labels['dag_id'] - task_id = labels['task_id'] - ex_time = self._label_safe_datestring_to_datetime(labels['execution_date']) - except Exception as e: # pylint: disable=broad-except - self.log.warning( - 'Error while retrieving labels; labels: %s; exception: %s', - labels, e - ) - return None - - with create_session() as session: - task = ( - session - .query(TaskInstance) - .filter_by(task_id=task_id, dag_id=dag_id, execution_date=ex_time) - .one_or_none() - ) - if task: - self.log.info( - 'Found matching task %s-%s (%s) with current state of %s', - task.dag_id, task.task_id, task.execution_date, task.state - ) - return TaskInstanceKey(dag_id, task_id, ex_time, try_num) - else: - self.log.warning( - 'task_id/dag_id are not safe to use as Kubernetes labels. This can cause ' - 'severe performance regressions. Please see ' - '. ' - 'Given dag_id: %s, task_id: %s', task_id, dag_id - ) - - tasks = ( - session - .query(TaskInstance) - .filter_by(execution_date=ex_time).all() - ) - self.log.info( - 'Checking %s task instances.', - len(tasks) - ) - for task in tasks: - if ( - pod_generator.make_safe_label_value(task.dag_id) == dag_id and - pod_generator.make_safe_label_value(task.task_id) == task_id and - task.execution_date == ex_time - ): - self.log.info( - 'Found matching task %s-%s (%s) with current state of %s', - task.dag_id, task.task_id, task.execution_date, task.state - ) - dag_id = task.dag_id - task_id = task.task_id - return TaskInstanceKey(dag_id, task_id, ex_time, try_num) - self.log.warning( - 'Failed to find and match task details to a pod; labels: %s', - labels - ) - return None - def _flush_watcher_queue(self) -> None: self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize()) while True: @@ -744,7 +667,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None: "dag_id={},task_id={},execution_date={},airflow-worker={}".format( pod_generator.make_safe_label_value(task.dag_id), pod_generator.make_safe_label_value(task.task_id), - AirflowKubernetesScheduler._datetime_to_label_safe_datestring( # noqa + pod_generator.datetime_to_label_safe_datestring( task.execution_date ), self.worker_uuid diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 13b6ebef23e99..02a31ab04d7e1 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -21,6 +21,7 @@ is supported and no serialization need be written. """ import copy +import datetime import hashlib import inspect import os @@ -30,6 +31,7 @@ from typing import Dict, List, Optional, Union import yaml +from dateutil import parser from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient @@ -88,6 +90,30 @@ def make_safe_label_value(string): return safe_label +def datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str: + """ + Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but + not "_" let's + replace ":" with "_" + + :param datetime_obj: datetime.datetime object + :return: ISO-like string representing the datetime + """ + return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_') + + +def label_safe_datestring_to_datetime(string: str) -> datetime.datetime: + """ + Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not + "_", let's + replace ":" with "_" + + :param string: str + :return: datetime.datetime object + """ + return parser.parse(string.replace('_plus_', '+').replace("_", ":")) + + class PodGenerator: """ Contains Kubernetes Airflow Worker configuration logic @@ -448,7 +474,7 @@ def construct_pod( task_id: str, pod_id: str, try_number: int, - date: str, + date: datetime.datetime, command: List[str], kube_executor_config: Optional[k8s.V1Pod], worker_config: k8s.V1Pod, @@ -465,13 +491,19 @@ def construct_pod( namespace=namespace, labels={ 'airflow-worker': worker_uuid, - 'dag_id': dag_id, - 'task_id': task_id, - 'execution_date': date, + 'dag_id': make_safe_label_value(dag_id), + 'task_id': make_safe_label_value(task_id), + 'execution_date': datetime_to_label_safe_datestring(date), 'try_number': str(try_number), 'airflow_version': airflow_version.replace('+', '-'), 'kubernetes_executor': 'True', }, + annotations={ + 'dag_id': dag_id, + 'task_id': task_id, + 'execution_date': date.isoformat(), + 'try_number': str(try_number), + }, cmds=command, name=pod_id ).gen_pod() diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 60a39ae19ef52..9cf2465074e12 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -107,14 +107,12 @@ def test_make_safe_label_value(self): pod_generator.make_safe_label_value(dag_id) ) - @unittest.skipIf(AirflowKubernetesScheduler is None, - "kubernetes python package is not installed") def test_execution_date_serialize_deserialize(self): datetime_obj = datetime.now() serialized_datetime = \ - AirflowKubernetesScheduler._datetime_to_label_safe_datestring( + pod_generator.datetime_to_label_safe_datestring( datetime_obj) - new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime( + new_datetime_obj = pod_generator.label_safe_datestring_to_datetime( serialized_datetime) self.assertEqual(datetime_obj, new_datetime_obj) diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index 01453296cb231..770cb58160830 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -19,12 +19,15 @@ import uuid from unittest import mock +from dateutil import parser from kubernetes.client import ApiClient, models as k8s from airflow.exceptions import AirflowConfigException from airflow.kubernetes.k8s_model import append_to_pod from airflow.kubernetes.pod import Resources -from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects +from airflow.kubernetes.pod_generator import ( + PodDefaults, PodGenerator, datetime_to_label_safe_datestring, extend_object_field, merge_objects, +) from airflow.kubernetes.secret import Secret @@ -63,19 +66,31 @@ def setUp(self): Secret('env', 'TARGET', 'secret_b', 'source_b'), ] + self.execution_date = parser.parse('2020-08-24 00:00:00.000000') + self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date) + self.dag_id = 'dag_id' + self.task_id = 'task_id' + self.try_number = 3 self.labels = { 'airflow-worker': 'uuid', - 'dag_id': 'dag_id', - 'execution_date': 'date', - 'task_id': 'task_id', - 'try_number': '3', + 'dag_id': self.dag_id, + 'execution_date': self.execution_date_label, + 'task_id': self.task_id, + 'try_number': str(self.try_number), 'airflow_version': mock.ANY, 'kubernetes_executor': 'True' } + self.annotations = { + 'dag_id': self.dag_id, + 'task_id': self.task_id, + 'execution_date': self.execution_date.isoformat(), + 'try_number': str(self.try_number), + } self.metadata = { 'labels': self.labels, 'name': 'pod_id-' + self.static_uuid.hex, - 'namespace': 'namespace' + 'namespace': 'namespace', + 'annotations': self.annotations, } self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi') @@ -415,11 +430,11 @@ def test_construct_pod_empty_worker_config(self, mock_uuid): worker_config = k8s.V1Pod() result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', + self.dag_id, + self.task_id, 'pod_id', - 3, - 'date', + self.try_number, + self.execution_date, ['command'], executor_config, worker_config, @@ -475,11 +490,11 @@ def test_construct_pod_empty_execuctor_config(self, mock_uuid): executor_config = None result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', + self.dag_id, + self.task_id, 'pod_id', - 3, - 'date', + self.try_number, + self.execution_date, ['command'], executor_config, worker_config, @@ -558,11 +573,11 @@ def test_construct_pod(self, mock_uuid): ) result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', + self.dag_id, + self.task_id, 'pod_id', - 3, - 'date', + self.try_number, + self.execution_date, ['command'], executor_config, worker_config, @@ -571,12 +586,13 @@ def test_construct_pod(self, mock_uuid): ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.metadata.update({'annotations': {'should': 'stay'}}) + expected_metadata = dict(self.metadata) + expected_metadata['annotations'].update({'should': 'stay'}) self.assertEqual({ 'apiVersion': 'v1', 'kind': 'Pod', - 'metadata': self.metadata, + 'metadata': expected_metadata, 'spec': { 'containers': [{ 'args': [], diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py index 780f34630ae90..32f121e5e9168 100644 --- a/tests/kubernetes/test_worker_configuration.py +++ b/tests/kubernetes/test_worker_configuration.py @@ -20,6 +20,7 @@ from unittest.mock import ANY import mock +from dateutil import parser from parameterized import parameterized from tests.test_utils.config import conf_vars @@ -30,7 +31,7 @@ from airflow.exceptions import AirflowConfigException from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig - from airflow.kubernetes.pod_generator import PodGenerator + from airflow.kubernetes.pod_generator import PodGenerator, datetime_to_label_safe_datestring from airflow.kubernetes.secret import Secret from airflow.kubernetes.worker_configuration import WorkerConfiguration from airflow.version import version as airflow_version @@ -381,12 +382,13 @@ def test_make_pod_assert_labels(self): self.kube_config.dags_folder = 'dags' worker_config = WorkerConfiguration(self.kube_config) + execution_date = parser.parse('2019-11-21 11:08:22.920875') pod = PodGenerator.construct_pod( "test_dag_id", "test_task_id", "test_pod_id", 1, - "2019-11-21 11:08:22.920875", + execution_date, ["bash -c 'ls /'"], None, worker_config.as_pod(), @@ -397,7 +399,7 @@ def test_make_pod_assert_labels(self): 'airflow-worker': 'sample-uuid', 'airflow_version': airflow_version.replace('+', '-'), 'dag_id': 'test_dag_id', - 'execution_date': '2019-11-21 11:08:22.920875', + 'execution_date': datetime_to_label_safe_datestring(execution_date), 'kubernetes_executor': 'True', 'my_label': 'label_id', 'task_id': 'test_task_id',