Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics API Scaler support different formats #5347

Merged
merged 15 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))

### Fixes

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/yaml.v3 v3.0.1
k8s.io/apiextensions-apiserver v0.29.0 // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/kms v0.29.0 // indirect
Expand Down
152 changes: 145 additions & 7 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package scalers

import (
"bufio"
"bytes"
"context"
"encoding/xml"
"errors"
"fmt"
"io"
Expand All @@ -12,6 +15,7 @@ import (

"github.com/go-logr/logr"
"github.com/tidwall/gjson"
"gopkg.in/yaml.v3"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand All @@ -32,6 +36,7 @@ type metricsAPIScalerMetadata struct {
targetValue float64
activationTargetValue float64
url string
format APIFormat
valueLocation string
unsafeSsl bool

Expand Down Expand Up @@ -62,7 +67,27 @@ type metricsAPIScalerMetadata struct {
}

const (
methodValueQuery = "query"
methodValueQuery = "query"
valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
)

type APIFormat string

// Options for APIFormat:
const (
PrometheusFormat APIFormat = "prometheus"
JSONFormat APIFormat = "json"
XMLFormat APIFormat = "xml"
YAMLFormat APIFormat = "yaml"
)

var (
supportedFormats = []APIFormat{
PrometheusFormat,
JSONFormat,
XMLFormat,
YAMLFormat,
}
)

// NewMetricsAPIScaler creates a new HTTP scaler
Expand Down Expand Up @@ -137,6 +162,16 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
return nil, fmt.Errorf("no url given in metadata")
}

if val, ok := config.TriggerMetadata["format"]; ok {
meta.format = APIFormat(strings.TrimSpace(val))
if !kedautil.Contains(supportedFormats, meta.format) {
return nil, fmt.Errorf("format %s not supported", meta.format)
}
} else {
// default format is JSON for backward compatibility
meta.format = JSONFormat
}

if val, ok := config.TriggerMetadata["valueLocation"]; ok {
meta.valueLocation = val
} else {
Expand Down Expand Up @@ -211,23 +246,126 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
return &meta, nil
}

// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body
func GetValueFromResponse(body []byte, valueLocation string) (float64, error) {
// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body using the format specified.
func GetValueFromResponse(body []byte, valueLocation string, format APIFormat) (float64, error) {
switch format {
case PrometheusFormat:
return getValueFromPrometheusResponse(body, valueLocation)
case JSONFormat:
return getValueFromJSONResponse(body, valueLocation)
case XMLFormat:
return getValueFromXMLResponse(body, valueLocation)
case YAMLFormat:
return getValueFromYAMLResponse(body, valueLocation)
}

return 0, fmt.Errorf("format %s not supported", format)
}

// getValueFromPrometheusResponse uses provided valueLocation to access the numeric value in provided body
func getValueFromPrometheusResponse(body []byte, valueLocation string) (float64, error) {
scanner := bufio.NewScanner(bytes.NewReader(body))
for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)
if len(fields) == 0 || strings.HasPrefix(fields[0], "#") {
continue
}
if len(fields) == 2 && strings.HasPrefix(fields[0], valueLocation) {
value, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return 0, err
}
return value, nil
}
}

if err := scanner.Err(); err != nil {
return 0, err
}

return 0, fmt.Errorf("value %s not found", valueLocation)
}

// getValueFromJSONResponse uses provided valueLocation to access the numeric value in provided body using GSON
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
func getValueFromJSONResponse(body []byte, valueLocation string) (float64, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
if r.Type == gjson.String {
v, err := resource.ParseQuantity(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.String())
}
return v.AsApproximateFloat64(), nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.Type.String())
}
return r.Num, nil
}

// getValueFromXMLResponse uses provided valueLocation to access the numeric value in provided body
func getValueFromXMLResponse(body []byte, valueLocation string) (float64, error) {
var xmlMap map[string]interface{}
err := xml.Unmarshal(body, &xmlMap)
if err != nil {
return 0, err
}

path, err := kedautil.GetValueByPath(xmlMap, valueLocation)
if err != nil {
return 0, err
}

switch v := path.(type) {
case int:
return float64(v), nil
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
r, err := resource.ParseQuantity(v)
if err != nil {
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
return r.AsApproximateFloat64(), nil
default:
return 0, fmt.Errorf("valueLocation must point to value of type number or a string representing a Quantity got: '%s'", v)
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// getValueFromYAMLResponse uses provided valueLocation to access the numeric value in provided body
// using generic ketautil.GetValueByPath
func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error) {
var yamlMap map[string]interface{}
err := yaml.Unmarshal(body, &yamlMap)
if err != nil {
return 0, err
}

path, err := kedautil.GetValueByPath(yamlMap, valueLocation)
if err != nil {
return 0, err
}

switch v := path.(type) {
case int:
return float64(v), nil
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
r, err := resource.ParseQuantity(v)
if err != nil {
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
return r.AsApproximateFloat64(), nil
default:
return 0, fmt.Errorf("valueLocation must point to value of type number or a string representing a Quantity got: '%s'", v)
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata)
if err != nil {
Expand All @@ -249,7 +387,7 @@ func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error)
if err != nil {
return 0, err
}
v, err := GetValueFromResponse(b, s.metadata.valueLocation)
v, err := GetValueFromResponse(b, s.metadata.valueLocation, s.metadata.format)
if err != nil {
return 0, err
}
Expand Down
62 changes: 27 additions & 35 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,42 +123,34 @@ func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) {
}

func TestGetValueFromResponse(t *testing.T) {
d := []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`)
v, err := GetValueFromResponse(d, "components.0.tasks")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 32 {
t.Errorf("Expected %d got %f", 32, v)
}

v, err = GetValueFromResponse(d, "count")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 2.43 {
t.Errorf("Expected %d got %f", 2, v)
}

v, err = GetValueFromResponse(d, "components.0.str")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 64 {
t.Errorf("Expected %d got %f", 64, v)
}

v, err = GetValueFromResponse(d, "components.0.k")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 1000 {
t.Errorf("Expected %d got %f", 1000, v)
}
testCases := []struct {
name string
input []byte
key string
format APIFormat
expectVal float64
expectErr bool
}{
{name: "integer", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "count", format: JSONFormat, expectVal: 2.43},
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
{name: "string", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "components.0.str", format: JSONFormat, expectVal: 64},
{name: "{}.[].{}", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "components.0.tasks", format: JSONFormat, expectVal: 32},
{name: "invalid data", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "components.0.wrong", format: JSONFormat, expectErr: true},
{name: "integer", input: []byte(`{components: [{id: 82328e93e, tasks: 32, str: '64', k: 1k, wrong: NaN}], count: 2.43}`), key: "count", format: YAMLFormat, expectVal: 2.43},
{name: "string", input: []byte(`{components: [{id: 82328e93e, tasks: 32, str: '64', k: 1k, wrong: NaN}], count: 2.43}`), key: "components.0.str", format: YAMLFormat, expectVal: 64},
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
{name: "{}.[].{}", input: []byte(`{components: [{id: 82328e93e, tasks: 32, str: '64', k: 1k, wrong: NaN}], count: 2.43}`), key: "components.0.tasks", format: YAMLFormat, expectVal: 32},
{name: "invalid data", input: []byte(`{components: [{id: 82328e93e, tasks: 32, str: '64', k: 1k, wrong: NaN}], count: 2.43}`), key: "components.0.wrong", format: YAMLFormat, expectErr: true},
}

for _, tc := range testCases {
t.Run(string(tc.format)+": "+tc.name, func(t *testing.T) {
v, err := GetValueFromResponse(tc.input, tc.key, tc.format)

if tc.expectErr {
assert.Error(t, err)
}

_, err = GetValueFromResponse(d, "components.0.wrong")
if err == nil {
t.Error("Expected error but got success", err)
assert.EqualValues(t, tc.expectVal, v)
})
}
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/util/value_by_path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package util

import (
"fmt"
"strings"
)

// GetValueByPath retrieves a value from a nested map using a dot-separated path
func GetValueByPath(data map[string]interface{}, path string) (interface{}, error) {
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
keys := strings.Split(path, ".")
current := data

for _, key := range keys {
val, ok := current[key]
if !ok {
return nil, fmt.Errorf("key '%s' not found in path '%s'", key, path)
}

switch typedValue := val.(type) {
case map[interface{}]interface{}:
// Convert map[interface{}]interface{} to map[string]interface{}
current = make(map[string]interface{})
for k, v := range typedValue {
current[fmt.Sprintf("%v", k)] = v
}
case []interface{}:
// Convert map[interface{}]interface{} to map[string]interface{}
current = make(map[string]interface{})
for k, v := range typedValue {
current[fmt.Sprintf("%v", k)] = v
}
case map[string]interface{}:
current = typedValue
default:
// Reached the final value
return val, nil
}
}

return nil, fmt.Errorf("path '%s' does not lead to a value", path)
}
Loading
Loading