Skip to content

Commit

Permalink
Metrics API Scaler support different format
Browse files Browse the repository at this point in the history
Signed-off-by: Friedrich Albert Kyuri <[email protected]>
  • Loading branch information
Friedrich Albert Kyuri committed Feb 14, 2024
1 parent 4da3a5b commit 724b63e
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 41 deletions.
153 changes: 146 additions & 7 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package scalers

import (
"bufio"
"bytes"
"context"
"encoding/xml"
"errors"
"fmt"
"io"
Expand All @@ -10,6 +13,8 @@ import (
"strconv"
"strings"

"gopkg.in/yaml.v3"

"github.com/go-logr/logr"
"github.com/tidwall/gjson"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -32,6 +37,7 @@ type metricsAPIScalerMetadata struct {
targetValue float64
activationTargetValue float64
url string
format APIFormat
valueLocation string
unsafeSsl bool

Expand Down Expand Up @@ -62,7 +68,27 @@ type metricsAPIScalerMetadata struct {
}

const (
methodValueQuery = "query"
methodValueQuery = "query"
valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
)

type APIFormat string

// Enum for APIFormat:
const (
PrometheusFormat APIFormat = "prometheus"
JSONFormat APIFormat = "json"
XMLFormat APIFormat = "xml"
YAMLFormat APIFormat = "yaml"
)

var (
supportedFormats = []APIFormat{
PrometheusFormat,
JSONFormat,
XMLFormat,
YAMLFormat,
}
)

// NewMetricsAPIScaler creates a new HTTP scaler
Expand Down Expand Up @@ -137,6 +163,16 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
return nil, fmt.Errorf("no url given in metadata")
}

if val, ok := config.TriggerMetadata["format"]; ok {
meta.format = APIFormat(strings.TrimSpace(val))
if !kedautil.Contains(supportedFormats, meta.format) {
return nil, fmt.Errorf("format %s not supported", meta.format)
}
} else {
// default format is JSON for backward compatibility
meta.format = JSONFormat
}

if val, ok := config.TriggerMetadata["valueLocation"]; ok {
meta.valueLocation = val
} else {
Expand Down Expand Up @@ -211,23 +247,126 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
return &meta, nil
}

// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body
func GetValueFromResponse(body []byte, valueLocation string) (float64, error) {
// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body using the format specified.
func GetValueFromResponse(body []byte, valueLocation string, format APIFormat) (float64, error) {
switch format {
case PrometheusFormat:
return getValueFromPrometheusResponse(body, valueLocation)
case JSONFormat:
return getValueFromJSONResponse(body, valueLocation)
case XMLFormat:
return getValueFromXMLResponse(body, valueLocation)
case YAMLFormat:
return getValueFromYAMLResponse(body, valueLocation)
}

return 0, fmt.Errorf("format %s not supported", format)
}

// getValueFromPrometheusResponse uses provided valueLocation to access the numeric value in provided body
func getValueFromPrometheusResponse(body []byte, valueLocation string) (float64, error) {
scanner := bufio.NewScanner(bytes.NewReader(body))
for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)
if len(fields) == 0 || strings.HasPrefix(fields[0], "#") {
continue
}
if len(fields) == 2 && strings.HasPrefix(fields[0], valueLocation) {
value, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return 0, err
}
return value, nil
}
}

if err := scanner.Err(); err != nil {
return 0, err
}

return 0, fmt.Errorf("Value %s not found", valueLocation)
}

// getValueFromJSONResponse uses provided valueLocation to access the numeric value in provided body using GSON
func getValueFromJSONResponse(body []byte, valueLocation string) (float64, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
if r.Type == gjson.String {
v, err := resource.ParseQuantity(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.String())
}
return v.AsApproximateFloat64(), nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.Type.String())
}
return r.Num, nil
}

// getValueFromXMLResponse uses provided valueLocation to access the numeric value in provided body
func getValueFromXMLResponse(body []byte, valueLocation string) (float64, error) {
var xmlMap map[string]interface{}
err := xml.Unmarshal(body, &xmlMap)
if err != nil {
return 0, err
}

path, err := kedautil.GetValueByPath(xmlMap, valueLocation)
if err != nil {
return 0, err
}

switch v := path.(type) {
case int:
return float64(v), nil
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
r, err := resource.ParseQuantity(v)
if err != nil {
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
return r.AsApproximateFloat64(), nil
default:
return 0, fmt.Errorf("valueLocation must point to value of type number or a string representing a Quantity got: '%s'", v)
}
}

// getValueFromYAMLResponse uses provided valueLocation to access the numeric value in provided body
// using generic ketautil.GetValueByPath
func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error) {
var yamlMap map[string]interface{}
err := yaml.Unmarshal(body, &yamlMap)
if err != nil {
return 0, err
}

path, err := kedautil.GetValueByPath(yamlMap, valueLocation)
if err != nil {
return 0, err
}

switch v := path.(type) {
case int:
return float64(v), nil
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
r, err := resource.ParseQuantity(v)
if err != nil {
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
return r.AsApproximateFloat64(), nil
default:
return 0, fmt.Errorf("valueLocation must point to value of type number or a string representing a Quantity got: '%s'", v)
}
}

func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata)
if err != nil {
Expand All @@ -249,7 +388,7 @@ func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error)
if err != nil {
return 0, err
}
v, err := GetValueFromResponse(b, s.metadata.valueLocation)
v, err := GetValueFromResponse(b, s.metadata.valueLocation, s.metadata.format)
if err != nil {
return 0, err
}
Expand Down
65 changes: 31 additions & 34 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,42 +123,39 @@ func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) {
}

func TestGetValueFromResponse(t *testing.T) {
d := []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`)
v, err := GetValueFromResponse(d, "components.0.tasks")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 32 {
t.Errorf("Expected %d got %f", 32, v)
}

v, err = GetValueFromResponse(d, "count")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 2.43 {
t.Errorf("Expected %d got %f", 2, v)
}

v, err = GetValueFromResponse(d, "components.0.str")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 64 {
t.Errorf("Expected %d got %f", 64, v)
}

v, err = GetValueFromResponse(d, "components.0.k")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 1000 {
t.Errorf("Expected %d got %f", 1000, v)
}
testCases := []struct {
name string
input []byte
key string
format APIFormat
expectVal interface{}
expectErr bool
}{
{name: "integer", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "count", format: JSONFormat, expectVal: 2.43},
{name: "string", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "components.0.str", format: JSONFormat, expectVal: 64},
{name: "{}.[].{}", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "components.0.tasks", format: JSONFormat, expectVal: 32},
{name: "invalid data", input: []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`), key: "components.0.wrong", format: JSONFormat, expectErr: true},
{name: "integer", input: []byte(`components: [{id: "82328e93e", tasks: 32, str: "64", k: "1k", wrong: "NaN"}] count: 2.43`), key: "count", format: YAMLFormat, expectVal: 2.43},
{name: "string", input: []byte(`components: [{id: "82328e93e", tasks: 32, str: "64", k: "1k", wrong: "NaN"}] count: 2.43`), key: "components.0.str", format: YAMLFormat, expectVal: 64},
{name: "{}.[].{}", input: []byte(`components: [{id: "82328e93e", tasks: 32, str: "64", k: "1k", wrong: "NaN"}] count: 2.43`), key: "components.0.tasks", format: YAMLFormat, expectVal: 32},
{name: "invalid data", input: []byte(`components: [{id: "82328e93e", tasks: 32, str: "64", k: "1k", wrong: "NaN"}] count: 2.43`), key: "components.0.wrong", format: YAMLFormat, expectErr: true},
}

for _, tc := range testCases {
t.Run(string(tc.format)+": "+tc.name, func(t *testing.T) {
v, err := GetValueFromResponse(tc.input, tc.key, tc.format)

if tc.expectErr && err == nil {
t.Error("Expected error but got success")
} else if !tc.expectErr && err != nil {
t.Error("Expected success but got error:", err)
}

_, err = GetValueFromResponse(d, "components.0.wrong")
if err == nil {
t.Error("Expected error but got success", err)
if !tc.expectErr && v != tc.expectVal {
t.Errorf("Expected %v, got %v", tc.expectVal, v)
}
})
}
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/util/value_by_path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"fmt"
"strings"
)

// GetValueByPath retrieves a value from a nested map using a dot-separated path
func GetValueByPath(data map[string]interface{}, path string) (interface{}, error) {
keys := strings.Split(path, ".")
current := data

for _, key := range keys {
val, ok := current[key]
if !ok {
return nil, fmt.Errorf("key '%s' not found in path '%s'", key, path)
}

switch v := val.(type) {
case map[interface{}]interface{}:
// Convert map[interface{}]interface{} to map[string]interface{}
current = make(map[string]interface{})
for k, v := range v {
current[fmt.Sprintf("%v", k)] = v
}
case map[string]interface{}:
current = v
default:
// Reached the final value
return val, nil
}
}

return nil, fmt.Errorf("path '%s' does not lead to a value", path)
}
71 changes: 71 additions & 0 deletions pkg/util/value_by_path_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package util

import (
"reflect"
"testing"
)

func TestGetValueByPath(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
path string
expected interface{}
wantErr bool
}{
{
name: "Valid path - String value",
input: map[string]interface{}{
"some": map[string]interface{}{
"nested": map[string]interface{}{
"key": "value",
},
},
},
path: "some.nested.key",
expected: "value",
wantErr: false,
},
{
name: "Valid path - Integer value",
input: map[string]interface{}{
"another": map[string]interface{}{
"nested": map[string]interface{}{
"key": 42,
},
},
},
path: "another.nested.key",
expected: 42,
wantErr: false,
},
{
name: "Invalid path - Key not found",
input: map[string]interface{}{
"some": map[string]interface{}{
"nested": map[string]interface{}{
"key": "value",
},
},
},
path: "nonexistent.path",
expected: nil,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, err := GetValueByPath(tt.input, tt.path)

if (err != nil) != tt.wantErr {
t.Errorf("Unexpected error status. got %v, wantErr %v", err, tt.wantErr)
return
}

if !reflect.DeepEqual(actual, tt.expected) {
t.Errorf("Mismatched result. got %v, want %v", actual, tt.expected)
}
})
}
}

0 comments on commit 724b63e

Please sign in to comment.