Skip to content

Commit

Permalink
Support KEDA scaling for triggerer in the helm chart (apache#32302)
Browse files Browse the repository at this point in the history
* Support KEDA scaling for triggerer in the helm chart

Signed-off-by: Hussein Awala <[email protected]>

* Fix triggerer ScaledObject condition

Signed-off-by: Hussein Awala <[email protected]>

* Add supports for custom query to be aligned with apache#32308

Signed-off-by: Hussein Awala <[email protected]>

---------

Signed-off-by: Hussein Awala <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
hussein-awala and jedcunningham authored Aug 19, 2023
1 parent f89af21 commit c98fd19
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 2 deletions.
15 changes: 15 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -968,3 +968,18 @@ capabilities:
{{- end }}
{{- end }}
{{- end }}

{{- define "kedaNetworkPolicySelector" }}
{{- if .Values.workers.keda.enabled }}

{{- if .Values.workers.keda.namespaceLabels }}
- namespaceSelector:
matchLabels: {{- toYaml .Values.workers.keda.namespaceLabels | nindent 10 }}
podSelector:
{{- else }}
- podSelector:
{{- end }}
matchLabels:
app: keda-operator
{{- end }}
{{- end }}
11 changes: 9 additions & 2 deletions chart/templates/pgbouncer/pgbouncer-networkpolicy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
################################
## Pgbouncer NetworkPolicy
#################################
{{- $workersKedaEnabled := and .Values.workers.keda.enabled (has .Values.executor (list "CeleryExecutor" "CeleryKubernetesExecutor")) }}
{{- $triggererEnabled := and (semverCompare ">=2.2.0" .Values.airflowVersion) .Values.triggerer.enabled }}
{{- $triggererKedaEnabled := and $triggererEnabled .Values.triggerer.keda.enabled }}
{{- if and .Values.pgbouncer.enabled .Values.networkPolicies.enabled }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
Expand Down Expand Up @@ -48,11 +51,15 @@ spec:
matchLabels:
tier: airflow
release: {{ .Release.Name }}
{{- if .Values.workers.keda.enabled }}
{{- if .Values.workers.keda.namespaceLabels }}
{{- if or $workersKedaEnabled $triggererKedaEnabled }}
{{- if and $workersKedaEnabled .Values.workers.keda.namespaceLabels }}
- namespaceSelector:
matchLabels: {{- toYaml .Values.workers.keda.namespaceLabels | nindent 10 }}
podSelector:
{{- else if and $triggererEnabled .Values.triggerer.keda.namespaceLabels }}
- namespaceSelector:
matchLabels: {{- toYaml .Values.triggerer.keda.namespaceLabels | nindent 10 }}
podSelector:
{{- else }}
- podSelector:
{{- end }}
Expand Down
3 changes: 3 additions & 0 deletions chart/templates/triggerer/triggerer-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{{- if .Values.triggerer.enabled }}
{{- /* Airflow version 2.6.0 is when triggerer logs serve introduced */ -}}
{{- $persistence := and .Values.triggerer.persistence.enabled (semverCompare ">=2.6.0" .Values.airflowVersion) }}
{{- $keda := .Values.triggerer.keda.enabled }}
{{- $nodeSelector := or .Values.triggerer.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.triggerer.affinity .Values.affinity }}
{{- $tolerations := or .Values.triggerer.tolerations .Values.tolerations }}
Expand Down Expand Up @@ -56,7 +57,9 @@ spec:
{{- if $persistence }}
serviceName: {{ .Release.Name }}-triggerer
{{- end }}
{{- if not $keda }}
replicas: {{ .Values.triggerer.replicas }}
{{- end }}
{{- if $revisionHistoryLimit }}
revisionHistoryLimit: {{ $revisionHistoryLimit }}
{{- end }}
Expand Down
57 changes: 57 additions & 0 deletions chart/templates/triggerer/triggerer-kedaautoscaler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}

################################
## Airflow Triggerer KEDA Scaler
#################################
{{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
{{- if and .Values.triggerer.enabled .Values.triggerer.keda.enabled }}
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{ .Release.Name }}-triggerer
labels:
tier: airflow
component: triggerer-horizontalpodautoscaler
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
deploymentName: {{ .Release.Name }}-triggerer
{{- if or (.Values.labels) (.Values.triggerer.labels) }}
{{- mustMerge .Values.triggerer.labels .Values.labels | toYaml | nindent 4 }}
{{- end }}
spec:
scaleTargetRef:
kind: {{ ternary "StatefulSet" "Deployment" .Values.triggerer.persistence.enabled }}
name: {{ .Release.Name }}-triggerer
pollingInterval: {{ .Values.triggerer.keda.pollingInterval }}
cooldownPeriod: {{ .Values.triggerer.keda.cooldownPeriod }}
minReplicaCount: {{ .Values.triggerer.keda.minReplicaCount }}
maxReplicaCount: {{ .Values.triggerer.keda.maxReplicaCount }}
{{- if .Values.triggerer.keda.advanced }}
advanced: {{- toYaml .Values.triggerer.keda.advanced | nindent 4 }}
{{- end }}
triggers:
- type: postgresql
metadata:
targetQueryValue: "1"
connectionFromEnv: AIRFLOW_CONN_AIRFLOW_DB
query: {{ tpl .Values.triggerer.keda.query . | quote }}
{{- end }}
{{- end }}
66 changes: 66 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3078,6 +3078,72 @@
],
"additionalProperties": false
}
},
"keda": {
"description": "KEDA configuration.",
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"description": "Allow KEDA autoscaling.",
"type": "boolean",
"default": false
},
"namespaceLabels": {
"description": "Labels used in `matchLabels` for namespace in the PgBouncer NetworkPolicy.",
"type": "object",
"default": {},
"additionalProperties": {
"type": "string"
}
},
"pollingInterval": {
"description": "How often KEDA polls the airflow DB to report new scale requests to the HPA.",
"type": "integer",
"default": 5
},
"cooldownPeriod": {
"description": "How many seconds KEDA will wait before scaling to zero.",
"type": "integer",
"default": 30
},
"minReplicaCount": {
"description": "Minimum number of triggerers created by KEDA.",
"type": "integer",
"default": 0
},
"maxReplicaCount": {
"description": "Maximum number of triggerers created by KEDA.",
"type": "integer",
"default": 10
},
"advanced": {
"description": "Advanced KEDA configuration.",
"type": "object",
"default": {},
"additionalProperties": false,
"properties": {
"horizontalPodAutoscalerConfig": {
"description": "HorizontalPodAutoscalerConfig specifies horizontal scale config.",
"type": "object",
"default": {},
"properties": {
"behavior": {
"description": "HorizontalPodAutoscalerBehavior configures the scaling behavior of the target.",
"type": "object",
"default": {},
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.HorizontalPodAutoscalerBehavior"
}
}
}
}
},
"query": {
"description": "Query to use for KEDA autoscaling. Must return a single integer.",
"type": "string",
"default": "SELECT ceil(COUNT(*)::decimal / {{ .Values.config.triggerer.default_capacity }}) FROM trigger"
}
}
}
}
},
Expand Down
36 changes: 36 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,40 @@ triggerer:

env: []

# Allow KEDA autoscaling.
keda:
enabled: false
namespaceLabels: {}

# How often KEDA polls the airflow DB to report new scale requests to the HPA
pollingInterval: 5

# How many seconds KEDA will wait before scaling to zero.
# Note that HPA has a separate cooldown period for scale-downs
cooldownPeriod: 30

# Minimum number of triggerers created by keda
minReplicaCount: 0

# Maximum number of triggerers created by keda
maxReplicaCount: 10

# Specify HPA related options
advanced: {}
# horizontalPodAutoscalerConfig:
# behavior:
# scaleDown:
# stabilizationWindowSeconds: 300
# policies:
# - type: Percent
# value: 100
# periodSeconds: 15

# Query to use for KEDA autoscaling. Must return a single integer.
query: >-
SELECT ceil(COUNT(*)::decimal / {{ .Values.config.triggerer.default_capacity }})
FROM trigger
# Airflow Dag Processor Config
dagProcessor:
enabled: false
Expand Down Expand Up @@ -2280,6 +2314,8 @@ config:
worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}'
triggerer:
default_capacity: 1000
# yamllint enable rule:line-length

# Whether Airflow can launch workers and/or pods in multiple namespaces
Expand Down
64 changes: 64 additions & 0 deletions helm_tests/airflow_core/test_triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,67 @@ class TestTriggererLogGroomer(LogGroomerTestBase):

obj_name = "triggerer"
folder = "triggerer"


class TestTriggererKedaAutoScaler:
"""Tests triggerer keda autoscaler."""

def test_should_add_component_specific_labels(self):
docs = render_chart(
values={
"triggerer": {
"keda": {"enabled": True},
"labels": {"test_label": "test_label_value"},
},
},
show_only=["templates/triggerer/triggerer-kedaautoscaler.yaml"],
)

assert "test_label" in jmespath.search("metadata.labels", docs[0])
assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value"

def test_should_remove_replicas_field(self):
docs = render_chart(
values={
"triggerer": {
"keda": {"enabled": True},
},
},
show_only=["templates/triggerer/triggerer-deployment.yaml"],
)

assert "replicas" not in jmespath.search("spec", docs[0])

@pytest.mark.parametrize(
"query, expected_query",
[
# default query
(
None,
"SELECT ceil(COUNT(*)::decimal / 1000) FROM trigger",
),
# test custom static query
(
"SELECT ceil(COUNT(*)::decimal / 2000) FROM trigger",
"SELECT ceil(COUNT(*)::decimal / 2000) FROM trigger",
),
# test custom template query
(
"SELECT ceil(COUNT(*)::decimal / {{ mul .Values.config.triggerer.default_capacity 2 }})"
" FROM trigger",
"SELECT ceil(COUNT(*)::decimal / 2000) FROM trigger",
),
],
)
def test_should_use_keda_query(self, query, expected_query):

docs = render_chart(
values={
"triggerer": {
"enabled": True,
"keda": {"enabled": True, **({"query": query} if query else {})},
},
},
show_only=["templates/triggerer/triggerer-kedaautoscaler.yaml"],
)
assert expected_query == jmespath.search("spec.triggers[0].metadata.query", docs[0])
Loading

0 comments on commit c98fd19

Please sign in to comment.