From 553b2c7b0965c02734b00defc989e06e8581c8e1 Mon Sep 17 00:00:00 2001 From: Tomek Urbaszek Date: Mon, 24 Aug 2020 11:00:00 +0200 Subject: [PATCH] Add HTTPScaler Closes: #929 Signed-off-by: Tomek Urbaszek --- pkg/scalers/http_scaler.go | 145 ++++++++++++++++++++++++++++++++ pkg/scalers/http_scaler_test.go | 39 +++++++++ pkg/scaling/scale_handler.go | 2 + 3 files changed, 186 insertions(+) create mode 100644 pkg/scalers/http_scaler.go create mode 100644 pkg/scalers/http_scaler_test.go diff --git a/pkg/scalers/http_scaler.go b/pkg/scalers/http_scaler.go new file mode 100644 index 00000000000..95ce5784b9a --- /dev/null +++ b/pkg/scalers/http_scaler.go @@ -0,0 +1,145 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "k8s.io/api/autoscaling/v2beta1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + "net/http" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "strconv" + "strings" +) + +const ( + httpMetricName = "HTTPMetricValue" +) + +type httpScaler struct { + metadata *httpScalerMetadata +} + +type httpScalerMetadata struct { + targetValue int + apiURL string + metricName string +} + +type metric struct { + Name string `json:"name"` + Value float64 `json:"value"` +} + +var httpLog = logf.Log.WithName("http_scaler") + +// NewHTTPScaler creates a new HTTP scaler +func NewHTTPScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) { + meta, err := parseHTTPMetadata(resolvedEnv, metadata, authParams) + if err != nil { + return nil, fmt.Errorf("error parsing HTTP metadata: %s", err) + } + return &httpScaler{metadata: meta}, nil +} + +func parseHTTPMetadata(resolvedEnv, metadata, authParams map[string]string) (*httpScalerMetadata, error) { + meta := httpScalerMetadata{} + + if val, ok := metadata["targetValue"]; ok { + targetValue, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("targetValue parsing error %s", err.Error()) + } + meta.targetValue = targetValue + } else { + return nil, fmt.Errorf("no targetValue given in metadata") + } + + if val, ok := metadata["apiURL"]; ok { + // remove ending / for better string formatting + meta.apiURL = strings.TrimSuffix(val, "/") + } else { + return nil, fmt.Errorf("no apiURL given in metadata") + } + + if val, ok := metadata["metricName"]; ok { + meta.metricName = val + } else { + return nil, fmt.Errorf("no metricName given in metadata") + } + + return &meta, nil +} + +func (s *httpScaler) checkHealth() error { + u := fmt.Sprintf("%s/health/", s.metadata.apiURL) + _, err := http.Get(u) + return err +} + +func (s *httpScaler) getMetricInfo() (*metric, error) { + var m *metric + u := fmt.Sprintf("%s/metrics/%s/", s.metadata.apiURL, s.metadata.metricName) + r, err := http.Get(u) + if err != nil { + return nil, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + err = json.Unmarshal(b, &m) + if err != nil { + return nil, err + } + return m, nil +} + +// Close does nothing in case of httpScaler +func (s *httpScaler) Close() error { + return nil +} + +// IsActive returns true if there are pending messages to be processed +func (s *httpScaler) IsActive(ctx context.Context) (bool, error) { + err := s.checkHealth() + if err != nil { + httpLog.Error(err, fmt.Sprintf("Error when checking API health: %s", err)) + return false, err + } + return true, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *httpScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { + targetQueryValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) + externalMetric := &v2beta1.ExternalMetricSource{ + MetricName: httpMetricName, + TargetAverageValue: targetQueryValue, + } + metricSpec := v2beta1.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta1.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *httpScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + m, err := s.getMetricInfo() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error requesting metrics endpoint: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(m.Value), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} diff --git a/pkg/scalers/http_scaler_test.go b/pkg/scalers/http_scaler_test.go new file mode 100644 index 00000000000..b64ee4ed11b --- /dev/null +++ b/pkg/scalers/http_scaler_test.go @@ -0,0 +1,39 @@ +package scalers + +import ( + "testing" +) + +var testHTTPResolvedEnv = map[string]string{} + +type parseHTTPMetadataTestData struct { + metadata map[string]string + raisesError bool +} + +var testHTTPMetadata = []parseHTTPMetadataTestData{ + // No metadata + {metadata: map[string]string{}, raisesError: true}, + // OK + {metadata: map[string]string{"apiURL": "http://dummy:1230/api/v1/", "metricName": "metric", "targetValue": "42"}, raisesError: false}, + // Target not an int + {metadata: map[string]string{"apiURL": "http://dummy:1230/api/v1/", "metricName": "metric", "targetValue": "aa"}, raisesError: true}, + // Missing metric name + {metadata: map[string]string{"apiURL": "http://dummy:1230/api/v1/", "targetValue": "aa"}, raisesError: true}, + // Missing apiURL + {metadata: map[string]string{"metricName": "metric", "targetValue": "aa"}, raisesError: true}, + // Missing targetValue + {metadata: map[string]string{"apiURL": "http://dummy:1230/api/v1/", "metricName": "metric"}, raisesError: true}, +} + +func TestParseHTTPMetadata(t *testing.T) { + for _, testData := range testHTTPMetadata { + _, err := parseHTTPMetadata(testHTTPResolvedEnv, testData.metadata, map[string]string{}) + if err != nil && !testData.raisesError { + t.Error("Expected success but got error", err) + } + if err == nil && testData.raisesError { + t.Error("Expected error but got success") + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 7127422852d..29aa8079f89 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -410,6 +410,8 @@ func buildScaler(name, namespace, triggerType string, resolvedEnv, triggerMetada return scalers.NewRedisStreamsScaler(resolvedEnv, triggerMetadata, authParams) case "stan": return scalers.NewStanScaler(resolvedEnv, triggerMetadata) + case "external-metric-source": + return scalers.NewHTTPScaler(resolvedEnv, triggerMetadata, authParams) default: return nil, fmt.Errorf("no scaler found for type: %s", triggerType) }