Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Kubernetes workload scaler config #6226

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -18,16 +17,13 @@ import (

type kubernetesWorkloadScaler struct {
metricType v2.MetricTargetType
metadata *kubernetesWorkloadMetadata
metadata kubernetesWorkloadMetadata
kubeClient client.Client
logger logr.Logger
}

const (
kubernetesWorkloadMetricType = "External"
podSelectorKey = "podSelector"
valueKey = "value"
activationValueKey = "activationValue"
)

var phasesCountedAsTerminated = []corev1.PodPhase{
Expand All @@ -36,11 +32,22 @@ var phasesCountedAsTerminated = []corev1.PodPhase{
}

type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
namespace string
value float64
activationValue float64
triggerIndex int
PodSelector string `keda:"name=podSelector, order=triggerMetadata"`
Value float64 `keda:"name=value, order=triggerMetadata"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, default=0"`

namespace string
triggerIndex int
podSelector labels.Selector
asMetricSource bool
}

func (m *kubernetesWorkloadMetadata) Validate() error {
if m.Value <= 0 && !m.asMetricSource {
return fmt.Errorf("value must be a float greater than 0")
}

return nil
}

// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler
Expand All @@ -50,9 +57,9 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

meta, parseErr := parseWorkloadMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %w", parseErr)
meta, err := parseKubernetesWorkloadMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %w", err)
}

return &kubernetesWorkloadScaler{
Expand All @@ -63,50 +70,46 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *scalersconfig
}, nil
}

func parseWorkloadMetadata(config *scalersconfig.ScalerConfig) (*kubernetesWorkloadMetadata, error) {
meta := &kubernetesWorkloadMetadata{}
var err error
func parseKubernetesWorkloadMetadata(config *scalersconfig.ScalerConfig) (kubernetesWorkloadMetadata, error) {
meta := kubernetesWorkloadMetadata{}
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing kubernetes workload metadata: %w", err)
}

meta.namespace = config.ScalableObjectNamespace
podSelector, err := labels.Parse(config.TriggerMetadata[podSelectorKey])
if err != nil || podSelector.String() == "" {
return nil, fmt.Errorf("invalid pod selector")
meta.triggerIndex = config.TriggerIndex
meta.asMetricSource = config.AsMetricSource

if meta.asMetricSource {
meta.Value = 0
}
meta.podSelector = podSelector
value, err := strconv.ParseFloat(config.TriggerMetadata[valueKey], 64)
if err != nil || value == 0 {
if config.AsMetricSource {
value = 0
} else {
return nil, fmt.Errorf("value must be a float greater than 0")
}

selector, err := labels.Parse(meta.PodSelector)
if err != nil {
return meta, fmt.Errorf("error parsing pod selector: %w", err)
}
meta.value = value
meta.podSelector = selector

meta.activationValue = 0
if val, ok := config.TriggerMetadata[activationValueKey]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("value must be a float")
}
meta.activationValue = activationValue
if err := meta.Validate(); err != nil {
return meta, err
}

meta.triggerIndex = config.TriggerIndex
return meta, nil
}

// Close no need for kubernetes workload scaler
func (s *kubernetesWorkloadScaler) Close(context.Context) error {
return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -121,19 +124,17 @@ func (s *kubernetesWorkloadScaler) GetMetricsAndActivity(ctx context.Context, me

metric := GenerateMetricInMili(metricName, float64(pods))

return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.activationValue, nil
return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.ActivationValue, nil
}

func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int64, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.podSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
listOptions := client.ListOptions{
LabelSelector: s.metadata.podSelector,
Namespace: s.metadata.namespace,
}

err := s.kubeClient.List(ctx, podList, opts...)
err := s.kubeClient.List(ctx, podList, &listOptions)
if err != nil {
return 0, err
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/scalers/kubernetes_workload_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{

func TestParseWorkloadMetadata(t *testing.T) {
for _, testData := range parseWorkloadMetadataTestDataset {
_, err := parseWorkloadMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ScalableObjectNamespace: testData.namespace})
_, err := NewKubernetesWorkloadScaler(
fake.NewClientBuilder().Build(),
&scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
ScalableObjectNamespace: testData.namespace,
},
)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
Expand Down Expand Up @@ -68,7 +74,7 @@ var isActiveWorkloadTestDataset = []workloadIsActiveTestData{

func TestWorkloadIsActive(t *testing.T) {
for _, testData := range isActiveWorkloadTestDataset {
s, _ := NewKubernetesWorkloadScaler(
s, err := NewKubernetesWorkloadScaler(
fake.NewClientBuilder().WithRuntimeObjects(createPodlist(testData.podCount)).Build(),
&scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
Expand All @@ -77,6 +83,10 @@ func TestWorkloadIsActive(t *testing.T) {
ScalableObjectNamespace: testData.namespace,
},
)
if err != nil {
t.Error("Error creating scaler", err)
continue
}
_, isActive, _ := s.GetMetricsAndActivity(context.TODO(), "Metric")
if testData.active && !isActive {
t.Error("Expected active but got inactive")
Expand Down Expand Up @@ -107,7 +117,7 @@ var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestDa

func TestWorkloadGetMetricSpecForScaling(t *testing.T) {
for _, testData := range getMetricSpecForScalingTestDataset {
s, _ := NewKubernetesWorkloadScaler(
s, err := NewKubernetesWorkloadScaler(
fake.NewClientBuilder().Build(),
&scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadata,
Expand All @@ -117,6 +127,10 @@ func TestWorkloadGetMetricSpecForScaling(t *testing.T) {
TriggerIndex: testData.triggerIndex,
},
)
if err != nil {
t.Error("Error creating scaler", err)
continue
}
metric := s.GetMetricSpecForScaling(context.Background())

if metric[0].External.Metric.Name != testData.name {
Expand Down Expand Up @@ -145,14 +159,11 @@ func createPodlist(count int) *v1.PodList {

func TestWorkloadPhase(t *testing.T) {
phases := map[v1.PodPhase]bool{
v1.PodRunning: true,
// succeeded and failed clearly count as terminated
v1.PodRunning: true,
v1.PodSucceeded: false,
v1.PodFailed: false,
// unknown could be for example a temporarily unresponsive node; count the pod
v1.PodUnknown: true,
// count pre-Running to avoid an additional delay on top of the poll interval
v1.PodPending: true,
v1.PodUnknown: true,
v1.PodPending: true,
}
for phase, active := range phases {
list := &v1.PodList{}
Expand Down
Loading