Skip to content

Commit

Permalink
Support KEDA scaling for triggerer in the helm chart
Browse files Browse the repository at this point in the history
Signed-off-by: Hussein Awala <[email protected]>
  • Loading branch information
hussein-awala committed Jul 1, 2023
1 parent df4c883 commit 9afa248
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 2 deletions.
15 changes: 15 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -899,3 +899,18 @@ capabilities:
{{- end }}
{{- end }}
{{- end }}

{{- define "kedaNetworkPolicySelector" }}
{{- if .Values.workers.keda.enabled }}

{{- if .Values.workers.keda.namespaceLabels }}
- namespaceSelector:
matchLabels: {{- toYaml .Values.workers.keda.namespaceLabels | nindent 10 }}
podSelector:
{{- else }}
- podSelector:
{{- end }}
matchLabels:
app: keda-operator
{{- end }}
{{- end }}
11 changes: 9 additions & 2 deletions chart/templates/pgbouncer/pgbouncer-networkpolicy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
################################
## Pgbouncer NetworkPolicy
#################################
{{- $workersKedaEnabled := and .Values.workers.keda.enabled (has .Values.executor (list "CeleryExecutor" "CeleryKubernetesExecutor")) }}
{{- $triggererEnabled := and (semverCompare ">=2.2.0" .Values.airflowVersion) .Values.triggerer.enabled }}
{{- $triggererKedaEnabled := and $triggererEnabled .Values.triggerer.keda.enabled }}
{{- if and .Values.pgbouncer.enabled .Values.networkPolicies.enabled }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
Expand Down Expand Up @@ -48,11 +51,15 @@ spec:
matchLabels:
tier: airflow
release: {{ .Release.Name }}
{{- if .Values.workers.keda.enabled }}
{{- if .Values.workers.keda.namespaceLabels }}
{{- if or $workersKedaEnabled $triggererKedaEnabled }}
{{- if and $workersKedaEnabled .Values.workers.keda.namespaceLabels }}
- namespaceSelector:
matchLabels: {{- toYaml .Values.workers.keda.namespaceLabels | nindent 10 }}
podSelector:
{{- else if and $triggererEnabled .Values.triggerer.keda.namespaceLabels }}
- namespaceSelector:
matchLabels: {{- toYaml .Values.triggerer.keda.namespaceLabels | nindent 10 }}
podSelector:
{{- else }}
- podSelector:
{{- end }}
Expand Down
3 changes: 3 additions & 0 deletions chart/templates/triggerer/triggerer-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{{- if .Values.triggerer.enabled }}
{{- /* Airflow version 2.6.0 is when triggerer logs serve introduced */ -}}
{{- $persistence := and .Values.triggerer.persistence.enabled (semverCompare ">=2.6.0" .Values.airflowVersion) }}
{{- $keda := .Values.triggerer.keda.enabled }}
{{- $nodeSelector := or .Values.triggerer.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.triggerer.affinity .Values.affinity }}
{{- $tolerations := or .Values.triggerer.tolerations .Values.tolerations }}
Expand Down Expand Up @@ -53,7 +54,9 @@ spec:
{{- if $persistence }}
serviceName: {{ .Release.Name }}-triggerer
{{- end }}
{{- if not $keda }}
replicas: {{ .Values.triggerer.replicas }}
{{- end }}
{{- if $revisionHistoryLimit }}
revisionHistoryLimit: {{ $revisionHistoryLimit }}
{{- end }}
Expand Down
59 changes: 59 additions & 0 deletions chart/templates/triggerer/triggerer-kedaautoscaler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}

################################
## Airflow Triggerer KEDA Scaler
#################################
{{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
{{- if .Values.triggerer.enabled }}
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{ .Release.Name }}-triggerer
labels:
tier: airflow
component: triggerer-horizontalpodautoscaler
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
deploymentName: {{ .Release.Name }}-triggerer
{{- if or (.Values.labels) (.Values.triggerer.labels) }}
{{- mustMerge .Values.triggerer.labels .Values.labels | toYaml | nindent 4 }}
{{- end }}
spec:
scaleTargetRef:
kind: {{ ternary "StatefulSet" "Deployment" .Values.triggerer.persistence.enabled }}
name: {{ .Release.Name }}-triggerer
pollingInterval: {{ .Values.triggerer.keda.pollingInterval }}
cooldownPeriod: {{ .Values.triggerer.keda.cooldownPeriod }}
minReplicaCount: {{ .Values.triggerer.keda.minReplicaCount }}
maxReplicaCount: {{ .Values.triggerer.keda.maxReplicaCount }}
{{- if .Values.triggerer.keda.advanced }}
advanced: {{- toYaml .Values.triggerer.keda.advanced | nindent 4 }}
{{- end }}
triggers:
- type: postgresql
metadata:
targetQueryValue: "1"
connectionFromEnv: AIRFLOW_CONN_AIRFLOW_DB
query: >-
SELECT ceil(COUNT(*)::decimal / {{ .Values.config.triggerer.default_capacity }})
FROM trigger
{{- end }}
{{- end }}
61 changes: 61 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,67 @@
],
"additionalProperties": false
}
},
"keda": {
"description": "KEDA configuration.",
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"description": "Allow KEDA autoscaling.",
"type": "boolean",
"default": false
},
"namespaceLabels": {
"description": "Labels used in `matchLabels` for namespace in the PgBouncer NetworkPolicy.",
"type": "object",
"default": {},
"additionalProperties": {
"type": "string"
}
},
"pollingInterval": {
"description": "How often KEDA polls the airflow DB to report new scale requests to the HPA.",
"type": "integer",
"default": 5
},
"cooldownPeriod": {
"description": "How many seconds KEDA will wait before scaling to zero.",
"type": "integer",
"default": 30
},
"minReplicaCount": {
"description": "Minimum number of triggerers created by KEDA.",
"type": "integer",
"default": 0
},
"maxReplicaCount": {
"description": "Maximum number of triggerers created by KEDA.",
"type": "integer",
"default": 10
},
"advanced": {
"description": "Advanced KEDA configuration.",
"type": "object",
"default": {},
"additionalProperties": false,
"properties": {
"horizontalPodAutoscalerConfig": {
"description": "HorizontalPodAutoscalerConfig specifies horizontal scale config.",
"type": "object",
"default": {},
"properties": {
"behavior": {
"description": "HorizontalPodAutoscalerBehavior configures the scaling behavior of the target.",
"type": "object",
"default": {},
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.HorizontalPodAutoscalerBehavior"
}
}
}
}
}
}
}
}
},
Expand Down
31 changes: 31 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,35 @@ triggerer:

env: []

# Allow KEDA autoscaling.
keda:
enabled: false
namespaceLabels: {}

# How often KEDA polls the airflow DB to report new scale requests to the HPA
pollingInterval: 5

# How many seconds KEDA will wait before scaling to zero.
# Note that HPA has a separate cooldown period for scale-downs
cooldownPeriod: 30

# Minimum number of triggerers created by keda
minReplicaCount: 0

# Maximum number of triggerers created by keda
maxReplicaCount: 10

# Specify HPA related options
advanced: {}
# horizontalPodAutoscalerConfig:
# behavior:
# scaleDown:
# stabilizationWindowSeconds: 300
# policies:
# - type: Percent
# value: 100
# periodSeconds: 15

# Airflow Dag Processor Config
dagProcessor:
enabled: false
Expand Down Expand Up @@ -2087,6 +2116,8 @@ config:
worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}'
triggerer:
default_capacity: 1000
# yamllint enable rule:line-length

# Whether Airflow can launch workers and/or pods in multiple namespaces
Expand Down
28 changes: 28 additions & 0 deletions tests/charts/airflow_core/test_triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,3 +574,31 @@ def test_should_add_component_specific_labels(self):
class TestTriggererLogGroomer(LogGroomerTestBase):
obj_name = "triggerer"
folder = "triggerer"


class TestTriggererKedaAutoScaler:
def test_should_add_component_specific_labels(self):
docs = render_chart(
values={
"triggerer": {
"keda": {"enabled": True},
"labels": {"test_label": "test_label_value"},
},
},
show_only=["templates/triggerer/triggerer-kedaautoscaler.yaml"],
)

assert "test_label" in jmespath.search("metadata.labels", docs[0])
assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value"

def test_should_remove_replicas_field(self):
docs = render_chart(
values={
"triggerer": {
"keda": {"enabled": True},
},
},
show_only=["templates/triggerer/triggerer-deployment.yaml"],
)

assert "replicas" not in jmespath.search("spec", docs[0])
115 changes: 115 additions & 0 deletions tests/charts/other/test_pgbouncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,118 @@ def test_exporter_secret_with_overrides(self):
"postgresql://username%40123123:password%40%21%40%23$%5E&%2A%28%[email protected]:1111"
"/pgbouncer?sslmode=require" == connection
)


class TestPgbouncerNetworkPolicy:
def test_should_create_pgbouncer_network_policy(self):
docs = render_chart(
values={"pgbouncer": {"enabled": True}, "networkPolicies": {"enabled": True}},
show_only=["templates/pgbouncer/pgbouncer-networkpolicy.yaml"],
)

assert "NetworkPolicy" == jmespath.search("kind", docs[0])
assert "release-name-pgbouncer-policy" == jmespath.search("metadata.name", docs[0])

@pytest.mark.parametrize(
"conf, expected_selector",
[
# test with workers.keda enabled without namespace labels
(
{"executor": "CeleryExecutor", "workers": {"keda": {"enabled": True}}},
[{"podSelector": {"matchLabels": {"app": "keda-operator"}}}],
),
# test with triggerer.keda enabled without namespace labels
(
{"triggerer": {"keda": {"enabled": True}}},
[{"podSelector": {"matchLabels": {"app": "keda-operator"}}}],
),
# test with workers.keda and triggerer.keda both enabled without namespace labels
(
{
"executor": "CeleryExecutor",
"workers": {"keda": {"enabled": True}},
"triggerer": {"keda": {"enabled": True}},
},
[{"podSelector": {"matchLabels": {"app": "keda-operator"}}}],
),
# test with workers.keda enabled with namespace labels
(
{
"executor": "CeleryExecutor",
"workers": {"keda": {"enabled": True, "namespaceLabels": {"app": "airflow"}}},
},
[
{
"namespaceSelector": {"matchLabels": {"app": "airflow"}},
"podSelector": {"matchLabels": {"app": "keda-operator"}},
}
],
),
# test with triggerer.keda enabled with namespace labels
(
{"triggerer": {"keda": {"enabled": True, "namespaceLabels": {"app": "airflow"}}}},
[
{
"namespaceSelector": {"matchLabels": {"app": "airflow"}},
"podSelector": {"matchLabels": {"app": "keda-operator"}},
}
],
),
# test with workers.keda and triggerer.keda both enabled with namespace labels
(
{
"executor": "CeleryExecutor",
"workers": {"keda": {"enabled": True, "namespaceLabels": {"app": "airflow"}}},
"triggerer": {"keda": {"enabled": True, "namespaceLabels": {"app": "airflow"}}},
},
[
{
"namespaceSelector": {"matchLabels": {"app": "airflow"}},
"podSelector": {"matchLabels": {"app": "keda-operator"}},
}
],
),
# test with workers.keda and triggerer.keda both enabled workers with namespace labels
# and triggerer without namespace labels
(
{
"executor": "CeleryExecutor",
"workers": {"keda": {"enabled": True, "namespaceLabels": {"app": "airflow"}}},
"triggerer": {"keda": {"enabled": True}},
},
[
{
"namespaceSelector": {"matchLabels": {"app": "airflow"}},
"podSelector": {"matchLabels": {"app": "keda-operator"}},
}
],
),
# test with workers.keda and triggerer.keda both enabled workers without namespace labels
# and triggerer with namespace labels
(
{
"executor": "CeleryExecutor",
"workers": {"keda": {"enabled": True}},
"triggerer": {"keda": {"enabled": True, "namespaceLabels": {"app": "airflow"}}},
},
[
{
"namespaceSelector": {"matchLabels": {"app": "airflow"}},
"podSelector": {"matchLabels": {"app": "keda-operator"}},
}
],
),
],
)
def test_pgbouncer_network_policy_with_keda(self, conf, expected_selector):

docs = render_chart(
values={
"pgbouncer": {"enabled": True},
"networkPolicies": {"enabled": True},
**conf,
},
show_only=["templates/pgbouncer/pgbouncer-networkpolicy.yaml"],
)

assert expected_selector == jmespath.search("spec.ingress[0].from[1:]", docs[0])

0 comments on commit 9afa248

Please sign in to comment.