diff --git a/CHANGELOG.md b/CHANGELOG.md index 210cd33649d..6eec927cc37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ Here is an overview of all new **experimental** features: - **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449)) - **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375)) - **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441)) +- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633)) - **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544)) ### Fixes diff --git a/go.mod b/go.mod index 67241b8fe68..b641b9aa34a 100644 --- a/go.mod +++ b/go.mod @@ -348,7 +348,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + gopkg.in/yaml.v3 v3.0.1 k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect k8s.io/kms v0.29.0 // indirect diff --git a/pkg/scalers/metrics_api_scaler.go b/pkg/scalers/metrics_api_scaler.go index d5e0820e3bc..b3a678b3cc5 100644 --- a/pkg/scalers/metrics_api_scaler.go +++ b/pkg/scalers/metrics_api_scaler.go @@ -1,7 +1,10 @@ package scalers import ( + "bufio" + "bytes" "context" + "encoding/xml" "errors" "fmt" "io" @@ -12,6 +15,7 @@ import ( "github.com/go-logr/logr" "github.com/tidwall/gjson" + "gopkg.in/yaml.v3" v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/metrics/pkg/apis/external_metrics" @@ -32,6 +36,7 @@ type metricsAPIScalerMetadata struct { targetValue float64 activationTargetValue float64 url string + format APIFormat valueLocation string unsafeSsl bool @@ -62,7 +67,27 @@ type metricsAPIScalerMetadata struct { } const ( - methodValueQuery = "query" + methodValueQuery = "query" + valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'" +) + +type APIFormat string + +// Options for APIFormat: +const ( + PrometheusFormat APIFormat = "prometheus" + JSONFormat APIFormat = "json" + XMLFormat APIFormat = "xml" + YAMLFormat APIFormat = "yaml" +) + +var ( + supportedFormats = []APIFormat{ + PrometheusFormat, + JSONFormat, + XMLFormat, + YAMLFormat, + } ) // NewMetricsAPIScaler creates a new HTTP scaler @@ -137,6 +162,16 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca return nil, fmt.Errorf("no url given in metadata") } + if val, ok := config.TriggerMetadata["format"]; ok { + meta.format = APIFormat(strings.TrimSpace(val)) + if !kedautil.Contains(supportedFormats, meta.format) { + return nil, fmt.Errorf("format %s not supported", meta.format) + } + } else { + // default format is JSON for backward compatibility + meta.format = JSONFormat + } + if val, ok := config.TriggerMetadata["valueLocation"]; ok { meta.valueLocation = val } else { @@ -211,23 +246,126 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca return &meta, nil } -// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body -func GetValueFromResponse(body []byte, valueLocation string) (float64, error) { +// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body using the format specified. +func GetValueFromResponse(body []byte, valueLocation string, format APIFormat) (float64, error) { + switch format { + case PrometheusFormat: + return getValueFromPrometheusResponse(body, valueLocation) + case JSONFormat: + return getValueFromJSONResponse(body, valueLocation) + case XMLFormat: + return getValueFromXMLResponse(body, valueLocation) + case YAMLFormat: + return getValueFromYAMLResponse(body, valueLocation) + } + + return 0, fmt.Errorf("format %s not supported", format) +} + +// getValueFromPrometheusResponse uses provided valueLocation to access the numeric value in provided body +func getValueFromPrometheusResponse(body []byte, valueLocation string) (float64, error) { + scanner := bufio.NewScanner(bytes.NewReader(body)) + for scanner.Scan() { + line := scanner.Text() + fields := strings.Fields(line) + if len(fields) == 0 || strings.HasPrefix(fields[0], "#") { + continue + } + if len(fields) == 2 && strings.HasPrefix(fields[0], valueLocation) { + value, err := strconv.ParseFloat(fields[1], 64) + if err != nil { + return 0, err + } + return value, nil + } + } + + if err := scanner.Err(); err != nil { + return 0, err + } + + return 0, fmt.Errorf("value %s not found", valueLocation) +} + +// getValueFromJSONResponse uses provided valueLocation to access the numeric value in provided body using GJSON +func getValueFromJSONResponse(body []byte, valueLocation string) (float64, error) { r := gjson.GetBytes(body, valueLocation) - errorMsg := "valueLocation must point to value of type number or a string representing a Quantity got: '%s'" if r.Type == gjson.String { v, err := resource.ParseQuantity(r.String()) if err != nil { - return 0, fmt.Errorf(errorMsg, r.String()) + return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.String()) } return v.AsApproximateFloat64(), nil } if r.Type != gjson.Number { - return 0, fmt.Errorf(errorMsg, r.Type.String()) + return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.Type.String()) } return r.Num, nil } +// getValueFromXMLResponse uses provided valueLocation to access the numeric value in provided body +func getValueFromXMLResponse(body []byte, valueLocation string) (float64, error) { + var xmlMap map[string]interface{} + err := xml.Unmarshal(body, &xmlMap) + if err != nil { + return 0, err + } + + path, err := kedautil.GetValueByPath(xmlMap, valueLocation) + if err != nil { + return 0, err + } + + switch v := path.(type) { + case int: + return float64(v), nil + case int64: + return float64(v), nil + case float64: + return v, nil + case string: + r, err := resource.ParseQuantity(v) + if err != nil { + return 0, fmt.Errorf(valueLocationWrongErrorMsg, v) + } + return r.AsApproximateFloat64(), nil + default: + return 0, fmt.Errorf(valueLocationWrongErrorMsg, v) + } +} + +// getValueFromYAMLResponse uses provided valueLocation to access the numeric value in provided body +// using generic ketautil.GetValueByPath +func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error) { + var yamlMap map[string]interface{} + err := yaml.Unmarshal(body, &yamlMap) + if err != nil { + return 0, err + } + + path, err := kedautil.GetValueByPath(yamlMap, valueLocation) + if err != nil { + return 0, err + } + + switch v := path.(type) { + case int: + return float64(v), nil + case int64: + return float64(v), nil + case float64: + return v, nil + case string: + r, err := resource.ParseQuantity(v) + if err != nil { + return 0, fmt.Errorf(valueLocationWrongErrorMsg, v) + } + return r.AsApproximateFloat64(), nil + default: + return 0, fmt.Errorf(valueLocationWrongErrorMsg, v) + } +} + func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) { request, err := getMetricAPIServerRequest(ctx, s.metadata) if err != nil { @@ -249,7 +387,7 @@ func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) if err != nil { return 0, err } - v, err := GetValueFromResponse(b, s.metadata.valueLocation) + v, err := GetValueFromResponse(b, s.metadata.valueLocation, s.metadata.format) if err != nil { return 0, err } diff --git a/pkg/scalers/metrics_api_scaler_test.go b/pkg/scalers/metrics_api_scaler_test.go index 7728d7d2fe0..cef44a7bfef 100644 --- a/pkg/scalers/metrics_api_scaler_test.go +++ b/pkg/scalers/metrics_api_scaler_test.go @@ -123,42 +123,38 @@ func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) { } func TestGetValueFromResponse(t *testing.T) { - d := []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`) - v, err := GetValueFromResponse(d, "components.0.tasks") - if err != nil { - t.Error("Expected success but got error", err) - } - if v != 32 { - t.Errorf("Expected %d got %f", 32, v) - } - - v, err = GetValueFromResponse(d, "count") - if err != nil { - t.Error("Expected success but got error", err) - } - if v != 2.43 { - t.Errorf("Expected %d got %f", 2, v) - } - - v, err = GetValueFromResponse(d, "components.0.str") - if err != nil { - t.Error("Expected success but got error", err) - } - if v != 64 { - t.Errorf("Expected %d got %f", 64, v) - } - - v, err = GetValueFromResponse(d, "components.0.k") - if err != nil { - t.Error("Expected success but got error", err) - } - if v != 1000 { - t.Errorf("Expected %d got %f", 1000, v) - } + inputJSON := []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`) + inputYAML := []byte(`{components: [{id: 82328e93e, tasks: 32, str: '64', k: 1k, wrong: NaN}], count: 2.43}`) + + testCases := []struct { + name string + input []byte + key string + format APIFormat + expectVal float64 + expectErr bool + }{ + {name: "integer", input: inputJSON, key: "count", format: JSONFormat, expectVal: 2.43}, + {name: "string", input: inputJSON, key: "components.0.str", format: JSONFormat, expectVal: 64}, + {name: "{}.[].{}", input: inputJSON, key: "components.0.tasks", format: JSONFormat, expectVal: 32}, + {name: "invalid data", input: inputJSON, key: "components.0.wrong", format: JSONFormat, expectErr: true}, + + {name: "integer", input: inputYAML, key: "count", format: YAMLFormat, expectVal: 2.43}, + {name: "string", input: inputYAML, key: "components.0.str", format: YAMLFormat, expectVal: 64}, + {name: "{}.[].{}", input: inputYAML, key: "components.0.tasks", format: YAMLFormat, expectVal: 32}, + {name: "invalid data", input: inputYAML, key: "components.0.wrong", format: YAMLFormat, expectErr: true}, + } + + for _, tc := range testCases { + t.Run(string(tc.format)+": "+tc.name, func(t *testing.T) { + v, err := GetValueFromResponse(tc.input, tc.key, tc.format) + + if tc.expectErr { + assert.Error(t, err) + } - _, err = GetValueFromResponse(d, "components.0.wrong") - if err == nil { - t.Error("Expected error but got success", err) + assert.EqualValues(t, tc.expectVal, v) + }) } } diff --git a/pkg/util/value_by_path.go b/pkg/util/value_by_path.go new file mode 100644 index 00000000000..83438dc77c8 --- /dev/null +++ b/pkg/util/value_by_path.go @@ -0,0 +1,59 @@ +package util + +import ( + "fmt" + "strings" +) + +// GetValueByPath retrieves a value from a nested map using a dot-separated path +// It also supports .number syntax to access array elements. +// +// This is a helper function for niche use cases. +// Consider using https://pkg.go.dev/k8s.io/apimachinery@v0.29.3/pkg/apis/meta/v1/unstructured#NestedFieldNoCopy instead +// +// Examples: +// +// data := map[string]interface{}{ +// "a": map[string]interface{}{ +// "b": []interface{}{ +// map[string]interface{}{"c": 1}, +// map[string]interface{}{"c": 2}, +// }, +// }, +// } +// +// GetValueByPath(data, "a.b.0.c") // 1 +// GetValueByPath(data, "not.found") // error +func GetValueByPath(data map[string]interface{}, path string) (interface{}, error) { + keys := strings.Split(path, ".") + current := data + + for _, key := range keys { + val, ok := current[key] + if !ok { + return nil, fmt.Errorf("key '%s' not found in path '%s'", key, path) + } + + switch typedValue := val.(type) { + case map[interface{}]interface{}: + // Convert map[interface{}]interface{} to map[string]interface{} + current = make(map[string]interface{}) + for k, v := range typedValue { + current[fmt.Sprintf("%v", k)] = v + } + case []interface{}: + // Convert map[interface{}]interface{} to map[string]interface{} + current = make(map[string]interface{}) + for k, v := range typedValue { + current[fmt.Sprintf("%v", k)] = v + } + case map[string]interface{}: + current = typedValue + default: + // Reached the final value + return val, nil + } + } + + return nil, fmt.Errorf("path '%s' does not lead to a value", path) +} diff --git a/pkg/util/value_by_path_test.go b/pkg/util/value_by_path_test.go new file mode 100644 index 00000000000..319eb53f6af --- /dev/null +++ b/pkg/util/value_by_path_test.go @@ -0,0 +1,82 @@ +package util + +import ( + "reflect" + "testing" +) + +func TestGetValueByPath(t *testing.T) { + tests := []struct { + name string + input map[string]interface{} + path string + expected interface{} + wantErr bool + }{ + { + name: "Valid path - String value", + input: map[string]interface{}{ + "some": map[string]interface{}{ + "nested": map[string]interface{}{ + "key": "value", + }, + }, + }, + path: "some.nested.key", + expected: "value", + wantErr: false, + }, + { + name: "Valid path - Integer value", + input: map[string]interface{}{ + "another": map[string]interface{}{ + "nested": map[string]interface{}{ + "key": 42, + }, + }, + }, + path: "another.nested.key", + expected: 42, + wantErr: false, + }, + { + name: "Invalid path - Key not found", + input: map[string]interface{}{ + "some": map[string]interface{}{ + "nested": map[string]interface{}{ + "key": "value", + }, + }, + }, + path: "nonexistent.path", + expected: nil, + wantErr: true, + }, + { + name: "Interface slice", + input: map[string]interface{}{ + "some": []interface{}{ + 1, 2, 3, + }, + }, + path: "some.0", + expected: 1, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual, err := GetValueByPath(tt.input, tt.path) + + if (err != nil) != tt.wantErr { + t.Errorf("Unexpected error status. got %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(actual, tt.expected) { + t.Errorf("Mismatched result. got %v, want %v", actual, tt.expected) + } + }) + } +}