Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Store the raw ti key info to pod annotations #10568

Merged
merged 1 commit into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 35 additions & 112 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
:ref:`executor:KubernetesExecutor`
"""
import base64
import datetime
import functools
import json
import multiprocessing
Expand All @@ -49,18 +48,16 @@
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
from airflow.models.taskinstance import TaskInstanceKey
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session
from airflow.utils.session import provide_session
from airflow.utils.state import State

MAX_LABEL_LEN = 63

# TaskInstance key, command, configuration
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any]

# key, state, pod_id, namespace, resource_version
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, str]

# pod_id, namespace, state, labels, resource_version
# pod_id, namespace, state, annotations, resource_version
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]


Expand Down Expand Up @@ -359,11 +356,19 @@ def _run(self,
)
if event['type'] == 'ERROR':
return self.process_error(event)
annotations = task.metadata.annotations
task_instance_related_annotations = {
'dag_id': annotations['dag_id'],
'task_id': annotations['task_id'],
'execution_date': annotations['execution_date'],
'try_number': annotations['try_number'],
}

self.process_status(
pod_id=task.metadata.name,
namespace=task.metadata.namespace,
status=task.status.phase,
labels=task.metadata.labels,
annotations=task_instance_related_annotations,
resource_version=task.metadata.resource_version,
event=event,
)
Expand Down Expand Up @@ -393,28 +398,30 @@ def process_error(self, event: Any) -> str:
def process_status(self, pod_id: str,
namespace: str,
status: str,
labels: Dict[str, str],
annotations: Dict[str, str],
resource_version: str,
event: Any) -> None:
"""Process status response"""
if status == 'Pending':
if event['type'] == 'DELETED':
self.log.info('Event: Failed to start pod %s, will reschedule', pod_id)
self.watcher_queue.put((pod_id, namespace, State.UP_FOR_RESCHEDULE, labels, resource_version))
self.watcher_queue.put(
(pod_id, namespace, State.UP_FOR_RESCHEDULE, annotations, resource_version)
)
else:
self.log.info('Event: %s Pending', pod_id)
elif status == 'Failed':
self.log.error('Event: %s Failed', pod_id)
self.watcher_queue.put((pod_id, namespace, State.FAILED, labels, resource_version))
self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
elif status == 'Succeeded':
self.log.info('Event: %s Succeeded', pod_id)
self.watcher_queue.put((pod_id, namespace, None, labels, resource_version))
self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
elif status == 'Running':
self.log.info('Event: %s is Running', pod_id)
else:
self.log.warning(
'Event: Invalid state: %s on pod: %s in namespace %s with labels: %s with '
'resource_version: %s', status, pod_id, namespace, labels, resource_version
'Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with '
'resource_version: %s', status, pod_id, namespace, annotations, resource_version
)


Expand Down Expand Up @@ -480,10 +487,10 @@ def run_next(self, next_job: KubernetesJobType) -> None:
namespace=self.namespace,
worker_uuid=self.worker_uuid,
pod_id=self._create_pod_id(dag_id, task_id),
dag_id=pod_generator.make_safe_label_value(dag_id),
task_id=pod_generator.make_safe_label_value(task_id),
dag_id=dag_id,
task_id=task_id,
try_number=try_number,
date=self._datetime_to_label_safe_datestring(execution_date),
date=execution_date,
command=command,
kube_executor_config=kube_executor_config,
worker_config=self.worker_configuration_pod
Expand Down Expand Up @@ -530,16 +537,24 @@ def sync(self) -> None:

def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
pod_id, namespace, state, labels, resource_version = task
pod_id, namespace, state, annotations, resource_version = task
self.log.info(
'Attempting to finish pod; pod_id: %s; state: %s; labels: %s',
pod_id, state, labels
'Attempting to finish pod; pod_id: %s; state: %s; annotations: %s',
pod_id, state, annotations
)
key = self._labels_to_key(labels=labels)
key = self._annotations_to_key(annotations=annotations)
if key:
self.log.debug('finishing job %s - %s (%s)', key, state, pod_id)
self.result_queue.put((key, state, pod_id, namespace, resource_version))

def _annotations_to_key(self, annotations: Dict[str, str]) -> Optional[TaskInstanceKey]:
dag_id = annotations['dag_id']
task_id = annotations['task_id']
try_number = int(annotations['try_number'])
execution_date = parser.parse(annotations['execution_date'])

return TaskInstanceKey(dag_id, task_id, execution_date, try_number)

@staticmethod
def _strip_unsafe_kubernetes_special_chars(string: str) -> str:
"""
Expand Down Expand Up @@ -581,98 +596,6 @@ def _create_pod_id(dag_id: str, task_id: str) -> str:
task_id)
return safe_dag_id + safe_task_id

@staticmethod
def _label_safe_datestring_to_datetime(string: str) -> datetime.datetime:
"""
Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
"_", let's
replace ":" with "_"
:param string: str
:return: datetime.datetime object
"""
return parser.parse(string.replace('_plus_', '+').replace("_", ":"))

@staticmethod
def _datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str:
"""
Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
not "_" let's
replace ":" with "_"
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
"""
return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')

def _labels_to_key(self, labels: Dict[str, str]) -> Optional[TaskInstanceKey]:
try_num = 1
try:
try_num = int(labels.get('try_number', '1'))
except ValueError:
self.log.warning("could not get try_number as an int: %s", labels.get('try_number', '1'))

try:
dag_id = labels['dag_id']
task_id = labels['task_id']
ex_time = self._label_safe_datestring_to_datetime(labels['execution_date'])
except Exception as e: # pylint: disable=broad-except
self.log.warning(
'Error while retrieving labels; labels: %s; exception: %s',
labels, e
)
return None

with create_session() as session:
task = (
session
.query(TaskInstance)
.filter_by(task_id=task_id, dag_id=dag_id, execution_date=ex_time)
.one_or_none()
)
if task:
self.log.info(
'Found matching task %s-%s (%s) with current state of %s',
task.dag_id, task.task_id, task.execution_date, task.state
)
return TaskInstanceKey(dag_id, task_id, ex_time, try_num)
else:
self.log.warning(
'task_id/dag_id are not safe to use as Kubernetes labels. This can cause '
'severe performance regressions. Please see '
'<https://kubernetes.io/docs/concepts/overview/working-with-objects'
'/labels/#syntax-and-character-set>. '
'Given dag_id: %s, task_id: %s', task_id, dag_id
)

tasks = (
session
.query(TaskInstance)
.filter_by(execution_date=ex_time).all()
)
self.log.info(
'Checking %s task instances.',
len(tasks)
)
for task in tasks:
if (
pod_generator.make_safe_label_value(task.dag_id) == dag_id and
pod_generator.make_safe_label_value(task.task_id) == task_id and
task.execution_date == ex_time
):
self.log.info(
'Found matching task %s-%s (%s) with current state of %s',
task.dag_id, task.task_id, task.execution_date, task.state
)
dag_id = task.dag_id
task_id = task.task_id
return TaskInstanceKey(dag_id, task_id, ex_time, try_num)
self.log.warning(
'Failed to find and match task details to a pod; labels: %s',
labels
)
return None

def _flush_watcher_queue(self) -> None:
self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize())
while True:
Expand Down Expand Up @@ -744,7 +667,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
"dag_id={},task_id={},execution_date={},airflow-worker={}".format(
pod_generator.make_safe_label_value(task.dag_id),
pod_generator.make_safe_label_value(task.task_id),
AirflowKubernetesScheduler._datetime_to_label_safe_datestring( # noqa
pod_generator.datetime_to_label_safe_datestring(
task.execution_date
),
self.worker_uuid
Expand Down
40 changes: 36 additions & 4 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is supported and no serialization need be written.
"""
import copy
import datetime
import hashlib
import inspect
import os
Expand All @@ -30,6 +31,7 @@
from typing import Dict, List, Optional, Union

import yaml
from dateutil import parser
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient

Expand Down Expand Up @@ -88,6 +90,30 @@ def make_safe_label_value(string):
return safe_label


def datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str:
"""
Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
not "_" let's
replace ":" with "_"
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
"""
return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')


def label_safe_datestring_to_datetime(string: str) -> datetime.datetime:
"""
Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
"_", let's
replace ":" with "_"
:param string: str
:return: datetime.datetime object
"""
return parser.parse(string.replace('_plus_', '+').replace("_", ":"))


class PodGenerator:
"""
Contains Kubernetes Airflow Worker configuration logic
Expand Down Expand Up @@ -448,7 +474,7 @@ def construct_pod(
task_id: str,
pod_id: str,
try_number: int,
date: str,
date: datetime.datetime,
command: List[str],
kube_executor_config: Optional[k8s.V1Pod],
worker_config: k8s.V1Pod,
Expand All @@ -465,13 +491,19 @@ def construct_pod(
namespace=namespace,
labels={
'airflow-worker': worker_uuid,
'dag_id': dag_id,
'task_id': task_id,
'execution_date': date,
'dag_id': make_safe_label_value(dag_id),
'task_id': make_safe_label_value(task_id),
'execution_date': datetime_to_label_safe_datestring(date),
'try_number': str(try_number),
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_executor': 'True',
},
annotations={
'dag_id': dag_id,
'task_id': task_id,
'execution_date': date.isoformat(),
'try_number': str(try_number),
},
cmds=command,
name=pod_id
).gen_pod()
Expand Down
6 changes: 2 additions & 4 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,12 @@ def test_make_safe_label_value(self):
pod_generator.make_safe_label_value(dag_id)
)

@unittest.skipIf(AirflowKubernetesScheduler is None,
"kubernetes python package is not installed")
def test_execution_date_serialize_deserialize(self):
datetime_obj = datetime.now()
serialized_datetime = \
AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
pod_generator.datetime_to_label_safe_datestring(
datetime_obj)
new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(
new_datetime_obj = pod_generator.label_safe_datestring_to_datetime(
serialized_datetime)

self.assertEqual(datetime_obj, new_datetime_obj)
Expand Down
Loading