Skip to content

Commit

Permalink
[k8s] Store the raw ti key info to pod annotations (#10568)
Browse files Browse the repository at this point in the history
The value of annotations can store the raw dag_id, task_id and
execution_date so that k8s executor can easily map pod event back
to the task instance
  • Loading branch information
pingzh authored Aug 26, 2020
1 parent 8a7c372 commit db378c0
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 143 deletions.
147 changes: 35 additions & 112 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
:ref:`executor:KubernetesExecutor`
"""
import base64
import datetime
import functools
import json
import multiprocessing
Expand All @@ -49,18 +48,16 @@
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
from airflow.models.taskinstance import TaskInstanceKey
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session
from airflow.utils.session import provide_session
from airflow.utils.state import State

MAX_LABEL_LEN = 63

# TaskInstance key, command, configuration
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any]

# key, state, pod_id, namespace, resource_version
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, str]

# pod_id, namespace, state, labels, resource_version
# pod_id, namespace, state, annotations, resource_version
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]


Expand Down Expand Up @@ -359,11 +356,19 @@ def _run(self,
)
if event['type'] == 'ERROR':
return self.process_error(event)
annotations = task.metadata.annotations
task_instance_related_annotations = {
'dag_id': annotations['dag_id'],
'task_id': annotations['task_id'],
'execution_date': annotations['execution_date'],
'try_number': annotations['try_number'],
}

self.process_status(
pod_id=task.metadata.name,
namespace=task.metadata.namespace,
status=task.status.phase,
labels=task.metadata.labels,
annotations=task_instance_related_annotations,
resource_version=task.metadata.resource_version,
event=event,
)
Expand Down Expand Up @@ -393,28 +398,30 @@ def process_error(self, event: Any) -> str:
def process_status(self, pod_id: str,
namespace: str,
status: str,
labels: Dict[str, str],
annotations: Dict[str, str],
resource_version: str,
event: Any) -> None:
"""Process status response"""
if status == 'Pending':
if event['type'] == 'DELETED':
self.log.info('Event: Failed to start pod %s, will reschedule', pod_id)
self.watcher_queue.put((pod_id, namespace, State.UP_FOR_RESCHEDULE, labels, resource_version))
self.watcher_queue.put(
(pod_id, namespace, State.UP_FOR_RESCHEDULE, annotations, resource_version)
)
else:
self.log.info('Event: %s Pending', pod_id)
elif status == 'Failed':
self.log.error('Event: %s Failed', pod_id)
self.watcher_queue.put((pod_id, namespace, State.FAILED, labels, resource_version))
self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
elif status == 'Succeeded':
self.log.info('Event: %s Succeeded', pod_id)
self.watcher_queue.put((pod_id, namespace, None, labels, resource_version))
self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
elif status == 'Running':
self.log.info('Event: %s is Running', pod_id)
else:
self.log.warning(
'Event: Invalid state: %s on pod: %s in namespace %s with labels: %s with '
'resource_version: %s', status, pod_id, namespace, labels, resource_version
'Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with '
'resource_version: %s', status, pod_id, namespace, annotations, resource_version
)


Expand Down Expand Up @@ -480,10 +487,10 @@ def run_next(self, next_job: KubernetesJobType) -> None:
namespace=self.namespace,
worker_uuid=self.worker_uuid,
pod_id=self._create_pod_id(dag_id, task_id),
dag_id=pod_generator.make_safe_label_value(dag_id),
task_id=pod_generator.make_safe_label_value(task_id),
dag_id=dag_id,
task_id=task_id,
try_number=try_number,
date=self._datetime_to_label_safe_datestring(execution_date),
date=execution_date,
command=command,
kube_executor_config=kube_executor_config,
worker_config=self.worker_configuration_pod
Expand Down Expand Up @@ -530,16 +537,24 @@ def sync(self) -> None:

def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
pod_id, namespace, state, labels, resource_version = task
pod_id, namespace, state, annotations, resource_version = task
self.log.info(
'Attempting to finish pod; pod_id: %s; state: %s; labels: %s',
pod_id, state, labels
'Attempting to finish pod; pod_id: %s; state: %s; annotations: %s',
pod_id, state, annotations
)
key = self._labels_to_key(labels=labels)
key = self._annotations_to_key(annotations=annotations)
if key:
self.log.debug('finishing job %s - %s (%s)', key, state, pod_id)
self.result_queue.put((key, state, pod_id, namespace, resource_version))

def _annotations_to_key(self, annotations: Dict[str, str]) -> Optional[TaskInstanceKey]:
dag_id = annotations['dag_id']
task_id = annotations['task_id']
try_number = int(annotations['try_number'])
execution_date = parser.parse(annotations['execution_date'])

return TaskInstanceKey(dag_id, task_id, execution_date, try_number)

@staticmethod
def _strip_unsafe_kubernetes_special_chars(string: str) -> str:
"""
Expand Down Expand Up @@ -581,98 +596,6 @@ def _create_pod_id(dag_id: str, task_id: str) -> str:
task_id)
return safe_dag_id + safe_task_id

@staticmethod
def _label_safe_datestring_to_datetime(string: str) -> datetime.datetime:
"""
Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
"_", let's
replace ":" with "_"
:param string: str
:return: datetime.datetime object
"""
return parser.parse(string.replace('_plus_', '+').replace("_", ":"))

@staticmethod
def _datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str:
"""
Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
not "_" let's
replace ":" with "_"
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
"""
return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')

def _labels_to_key(self, labels: Dict[str, str]) -> Optional[TaskInstanceKey]:
try_num = 1
try:
try_num = int(labels.get('try_number', '1'))
except ValueError:
self.log.warning("could not get try_number as an int: %s", labels.get('try_number', '1'))

try:
dag_id = labels['dag_id']
task_id = labels['task_id']
ex_time = self._label_safe_datestring_to_datetime(labels['execution_date'])
except Exception as e: # pylint: disable=broad-except
self.log.warning(
'Error while retrieving labels; labels: %s; exception: %s',
labels, e
)
return None

with create_session() as session:
task = (
session
.query(TaskInstance)
.filter_by(task_id=task_id, dag_id=dag_id, execution_date=ex_time)
.one_or_none()
)
if task:
self.log.info(
'Found matching task %s-%s (%s) with current state of %s',
task.dag_id, task.task_id, task.execution_date, task.state
)
return TaskInstanceKey(dag_id, task_id, ex_time, try_num)
else:
self.log.warning(
'task_id/dag_id are not safe to use as Kubernetes labels. This can cause '
'severe performance regressions. Please see '
'<https://kubernetes.io/docs/concepts/overview/working-with-objects'
'/labels/#syntax-and-character-set>. '
'Given dag_id: %s, task_id: %s', task_id, dag_id
)

tasks = (
session
.query(TaskInstance)
.filter_by(execution_date=ex_time).all()
)
self.log.info(
'Checking %s task instances.',
len(tasks)
)
for task in tasks:
if (
pod_generator.make_safe_label_value(task.dag_id) == dag_id and
pod_generator.make_safe_label_value(task.task_id) == task_id and
task.execution_date == ex_time
):
self.log.info(
'Found matching task %s-%s (%s) with current state of %s',
task.dag_id, task.task_id, task.execution_date, task.state
)
dag_id = task.dag_id
task_id = task.task_id
return TaskInstanceKey(dag_id, task_id, ex_time, try_num)
self.log.warning(
'Failed to find and match task details to a pod; labels: %s',
labels
)
return None

def _flush_watcher_queue(self) -> None:
self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize())
while True:
Expand Down Expand Up @@ -744,7 +667,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
"dag_id={},task_id={},execution_date={},airflow-worker={}".format(
pod_generator.make_safe_label_value(task.dag_id),
pod_generator.make_safe_label_value(task.task_id),
AirflowKubernetesScheduler._datetime_to_label_safe_datestring( # noqa
pod_generator.datetime_to_label_safe_datestring(
task.execution_date
),
self.worker_uuid
Expand Down
40 changes: 36 additions & 4 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is supported and no serialization need be written.
"""
import copy
import datetime
import hashlib
import inspect
import os
Expand All @@ -30,6 +31,7 @@
from typing import Dict, List, Optional, Union

import yaml
from dateutil import parser
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient

Expand Down Expand Up @@ -88,6 +90,30 @@ def make_safe_label_value(string):
return safe_label


def datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str:
"""
Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
not "_" let's
replace ":" with "_"
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
"""
return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')


def label_safe_datestring_to_datetime(string: str) -> datetime.datetime:
"""
Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
"_", let's
replace ":" with "_"
:param string: str
:return: datetime.datetime object
"""
return parser.parse(string.replace('_plus_', '+').replace("_", ":"))


class PodGenerator:
"""
Contains Kubernetes Airflow Worker configuration logic
Expand Down Expand Up @@ -448,7 +474,7 @@ def construct_pod(
task_id: str,
pod_id: str,
try_number: int,
date: str,
date: datetime.datetime,
command: List[str],
kube_executor_config: Optional[k8s.V1Pod],
worker_config: k8s.V1Pod,
Expand All @@ -465,13 +491,19 @@ def construct_pod(
namespace=namespace,
labels={
'airflow-worker': worker_uuid,
'dag_id': dag_id,
'task_id': task_id,
'execution_date': date,
'dag_id': make_safe_label_value(dag_id),
'task_id': make_safe_label_value(task_id),
'execution_date': datetime_to_label_safe_datestring(date),
'try_number': str(try_number),
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_executor': 'True',
},
annotations={
'dag_id': dag_id,
'task_id': task_id,
'execution_date': date.isoformat(),
'try_number': str(try_number),
},
cmds=command,
name=pod_id
).gen_pod()
Expand Down
6 changes: 2 additions & 4 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,12 @@ def test_make_safe_label_value(self):
pod_generator.make_safe_label_value(dag_id)
)

@unittest.skipIf(AirflowKubernetesScheduler is None,
"kubernetes python package is not installed")
def test_execution_date_serialize_deserialize(self):
datetime_obj = datetime.now()
serialized_datetime = \
AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
pod_generator.datetime_to_label_safe_datestring(
datetime_obj)
new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(
new_datetime_obj = pod_generator.label_safe_datestring_to_datetime(
serialized_datetime)

self.assertEqual(datetime_obj, new_datetime_obj)
Expand Down
Loading

0 comments on commit db378c0

Please sign in to comment.