Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes multi-namespace mode optional #9570

Merged
merged 1 commit into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,13 @@
type: string
example: ~
default: "default"
- name: multi_namespace_mode
description: |
Allows users to launch pods in multiple namespaces.
Will require creating a cluster-role for the scheduler
type: boolean
example: ~
default: "False"
- name: airflow_configmap
description: |
The name of the Kubernetes ConfigMap containing ``airflow.cfg`` file.
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,10 @@ worker_pods_creation_batch_size = 1
# The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
namespace = default

# Allows users to launch pods in multiple namespaces.
# Will require creating a cluster-role for the scheduler
multi_namespace_mode = False

# The name of the Kubernetes ConfigMap containing ``airflow.cfg`` file.
#
# For example:
Expand Down
19 changes: 18 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
import base64
import datetime
import functools
import json
import multiprocessing
import time
Expand Down Expand Up @@ -188,6 +189,7 @@ def __init__(self): # pylint: disable=too-many-statements
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
Expand Down Expand Up @@ -287,11 +289,15 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""

def __init__(self,
namespace: Optional[str],
mult_namespace_mode: bool,
watcher_queue: 'Queue[KubernetesWatchType]',
resource_version: Optional[str],
worker_uuid: Optional[str],
kube_config: Configuration):
super().__init__()
self.namespace = namespace
self.multi_namespace_mode = mult_namespace_mode
self.worker_uuid = worker_uuid
self.watcher_queue = watcher_queue
self.resource_version = resource_version
Expand Down Expand Up @@ -336,7 +342,16 @@ def _run(self,
kwargs[key] = value

last_resource_version: Optional[str] = None
for event in watcher.stream(kube_client.list_pod_for_all_namespaces, **kwargs):
if self.multi_namespace_mode:
list_worker_pods = functools.partial(watcher.stream,
kube_client.list_pod_for_all_namespaces,
**kwargs)
else:
list_worker_pods = functools.partial(watcher.stream,
kube_client.list_namespaced_pod,
self.namespace,
**kwargs)
for event in list_worker_pods():
task = event['object']
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -430,6 +445,8 @@ def __init__(self,
def _make_kube_watcher(self) -> KubernetesJobWatcher:
resource_version = KubeResourceVersion.get_current_resource_version()
watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue,
namespace=self.kube_config.kube_namespace,
mult_namespace_mode=self.kube_config.multi_namespace_mode,
resource_version=resource_version,
worker_uuid=self.worker_uuid,
kube_config=self.kube_config)
Expand Down