From bd0503f9f97e17f478b9e0a6507a3939910b05c9 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> Date: Thu, 27 Jan 2022 09:35:21 +0100 Subject: [PATCH] Clear scalers cache correctly both in Operator and Metrics Server (#2564) Signed-off-by: Zbynek Roubalik Signed-off-by: Mark Rzasa --- CHANGELOG.md | 2 +- .../keda/metrics_adapter_controller.go | 26 +++++++++++++------ pkg/mock/mock_scaling/mock_interface.go | 10 ++++--- pkg/provider/provider.go | 24 ++++++++++++----- pkg/scaling/scale_handler.go | 25 +++++++++++++----- 5 files changed, 61 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f120681ffa..873dfc4d97f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,7 @@ ### Improvements - **General:** `keda-operator` Cluster Role: add `list` and `watch` access to service accounts ([#2406](https://github.com/kedacore/keda/pull/2406))|([#2410](https://github.com/kedacore/keda/pull/2410)) -- **General:** Delete the cache entry when a ScaledObject is deleted ([#2408](https://github.com/kedacore/keda/pull/2408)) +- **General:** Delete the cache entry when a ScaledObject is deleted ([#2564](https://github.com/kedacore/keda/pull/2564)) - **General:** Sign KEDA images published on GitHub Container Registry ([#2501](https://github.com/kedacore/keda/pull/2501))|([#2502](https://github.com/kedacore/keda/pull/2502))|([#2504](https://github.com/kedacore/keda/pull/2504)) - **Azure Pipelines Scaler:** support `poolName` or `poolID` validation ([#2370](https://github.com/kedacore/keda/pull/2370)) - **Graphite Scaler:** use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365)) diff --git a/controllers/keda/metrics_adapter_controller.go b/controllers/keda/metrics_adapter_controller.go index 514273ee06f..69664b06f96 100644 --- a/controllers/keda/metrics_adapter_controller.go +++ b/controllers/keda/metrics_adapter_controller.go @@ -57,9 +57,12 @@ func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl. // Request object not found, could have been deleted after reconcile request. // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. // Return and don't requeue - - r.removeFromCache(req.NamespacedName.String()) - return ctrl.Result{}, nil + err := r.ScaleHandler.ClearScalersCache(ctx, scaledObject) + if err != nil { + reqLogger.Error(err, "error clearing scalers cache") + } + r.removeFromMetricsCache(req.NamespacedName.String()) + return ctrl.Result{}, err } // Error reading the object - requeue the request. reqLogger.Error(err, "Failed to get ScaledObject") @@ -70,8 +73,12 @@ func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl. // indicated by the deletion timestamp being set. // This depends on the preexisting finalizer setup in ScaledObjectController. if scaledObject.GetDeletionTimestamp() != nil { - r.removeFromCache(req.NamespacedName.String()) - return ctrl.Result{}, nil + err := r.ScaleHandler.ClearScalersCache(ctx, scaledObject) + if err != nil { + reqLogger.Error(err, "error clearing scalers cache") + } + r.removeFromMetricsCache(req.NamespacedName.String()) + return ctrl.Result{}, err } reqLogger.V(1).Info("Reconciling ScaledObject", "externalMetricNames", scaledObject.Status.ExternalMetricNames) @@ -82,8 +89,11 @@ func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl. } r.addToMetricsCache(req.NamespacedName.String(), scaledObject.Status.ExternalMetricNames) - r.ScaleHandler.ClearScalersCache(ctx, req.Name, req.Namespace) - return ctrl.Result{}, nil + err = r.ScaleHandler.ClearScalersCache(ctx, scaledObject) + if err != nil { + reqLogger.Error(err, "error clearing scalers cache") + } + return ctrl.Result{}, err } func (r *MetricsScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { @@ -105,7 +115,7 @@ func (r *MetricsScaledObjectReconciler) addToMetricsCache(namespacedName string, (*r.ExternalMetricsInfo) = extMetrics } -func (r *MetricsScaledObjectReconciler) removeFromCache(namespacedName string) { +func (r *MetricsScaledObjectReconciler) removeFromMetricsCache(namespacedName string) { scaledObjectsMetricsLock.Lock() defer scaledObjectsMetricsLock.Unlock() delete(scaledObjectsMetrics, namespacedName) diff --git a/pkg/mock/mock_scaling/mock_interface.go b/pkg/mock/mock_scaling/mock_interface.go index 7913f2e67fc..3b46f343d62 100644 --- a/pkg/mock/mock_scaling/mock_interface.go +++ b/pkg/mock/mock_scaling/mock_interface.go @@ -36,15 +36,17 @@ func (m *MockScaleHandler) EXPECT() *MockScaleHandlerMockRecorder { } // ClearScalersCache mocks base method. -func (m *MockScaleHandler) ClearScalersCache(ctx context.Context, name, namespace string) { +func (m *MockScaleHandler) ClearScalersCache(ctx context.Context, scalableObject interface{}) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "ClearScalersCache", ctx, name, namespace) + ret := m.ctrl.Call(m, "ClearScalersCache", ctx, scalableObject) + ret0, _ := ret[0].(error) + return ret0 } // ClearScalersCache indicates an expected call of ClearScalersCache. -func (mr *MockScaleHandlerMockRecorder) ClearScalersCache(ctx, name, namespace interface{}) *gomock.Call { +func (mr *MockScaleHandlerMockRecorder) ClearScalersCache(ctx, scalableObject interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearScalersCache", reflect.TypeOf((*MockScaleHandler)(nil).ClearScalersCache), ctx, name, namespace) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearScalersCache", reflect.TypeOf((*MockScaleHandler)(nil).ClearScalersCache), ctx, scalableObject) } // DeleteScalableObject mocks base method. diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 196aafb515b..e4ab4410b54 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -77,7 +77,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, logger.V(1).Info("KEDA Metrics Server received request for external metrics", "namespace", namespace, "metric name", info.Metric, "metricSelector", metricSelector.String()) selector, err := labels.ConvertSelectorToLabelsMap(metricSelector.String()) if err != nil { - logger.Error(err, "Error converting Selector to Labels Map") + logger.Error(err, "error converting Selector to Labels Map") return nil, err } @@ -96,16 +96,15 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, scaledObject := &scaledObjects.Items[0] var matchingMetrics []external_metrics.ExternalMetricValue - cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject) - if err != nil { - return nil, err - } + cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject) metricsServer.RecordScalerObjectError(scaledObject.Namespace, scaledObject.Name, err) if err != nil { return nil, fmt.Errorf("error when getting scalers %s", err) } + scalerError := false + for scalerIndex, scaler := range cache.GetScalers() { metricSpecs := scaler.GetMetricSpecForScaling(ctx) scalerName := strings.Replace(fmt.Sprintf("%T", scaler), "*scalers.", "", 1) @@ -121,6 +120,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, metrics, err = p.getMetricsWithFallback(ctx, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { + scalerError = true logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scaler) } else { for _, metric := range metrics { @@ -134,8 +134,18 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } } + // invalidate the cache for the ScaledObject, if we hit an error in any scaler + // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call + if scalerError { + err := p.scaleHandler.ClearScalersCache(ctx, scaledObject) + if err != nil { + logger.Error(err, "error clearing scalers cache") + } + logger.V(1).Info("scaler error encountered, clearing scaler cache") + } + if len(matchingMetrics) == 0 { - return nil, fmt.Errorf("No matching metrics found for " + info.Metric) + return nil, fmt.Errorf("no matching metrics found for " + info.Metric) } return &external_metrics.ExternalMetricValueList{ @@ -164,7 +174,7 @@ func (p *KedaProvider) GetMetricByName(ctx context.Context, name types.Namespace // GetMetricBySelector fetches a particular metric for a set of objects matching // the given label selector. The namespace will be empty if the metric is root-scoped. func (p *KedaProvider) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) { - logger.V(0).Info("Received request for custom metric", "groupresource", info.GroupResource.String(), "namespace", namespace, "metric name", info.Metric, "selector", selector.String()) + logger.V(0).Info("Received request for custom metric, which is not supported by this adapter", "groupresource", info.GroupResource.String(), "namespace", namespace, "metric name", info.Metric, "selector", selector.String()) return nil, apiErrors.NewServiceUnavailable("not implemented yet") } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 5532c6dabae..e677166932b 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,7 +19,6 @@ package scaling import ( "context" "fmt" - "strings" "sync" "time" @@ -46,7 +45,7 @@ type ScaleHandler interface { HandleScalableObject(ctx context.Context, scalableObject interface{}) error DeleteScalableObject(ctx context.Context, scalableObject interface{}) error GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error) - ClearScalersCache(ctx context.Context, name, namespace string) + ClearScalersCache(ctx context.Context, scalableObject interface{}) error } type scaleHandler struct { @@ -126,7 +125,10 @@ func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject cancel() } h.scaleLoopContexts.Delete(key) - delete(h.scalerCaches, key) + err := h.ClearScalersCache(ctx, scalableObject) + if err != nil { + h.logger.Error(err, "error clearing scalers cache") + } h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersStopped, "Stopped scalers watch") } else { h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) @@ -151,7 +153,10 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a tmr.Stop() case <-ctx.Done(): logger.V(1).Info("Context canceled") - h.ClearScalersCache(ctx, withTriggers.Name, withTriggers.Namespace) + err := h.ClearScalersCache(ctx, scalableObject) + if err != nil { + logger.Error(err, "error clearing scalers cache") + } tmr.Stop() return } @@ -201,15 +206,23 @@ func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject inter return h.scalerCaches[key], nil } -func (h *scaleHandler) ClearScalersCache(ctx context.Context, name, namespace string) { +func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject interface{}) error { + withTriggers, err := asDuckWithTriggers(scalableObject) + if err != nil { + return err + } + + key := withTriggers.GenerateIdenitifier() + h.lock.Lock() defer h.lock.Unlock() - key := strings.ToLower(fmt.Sprintf("%s.%s", name, namespace)) if cache, ok := h.scalerCaches[key]; ok { cache.Close(ctx) delete(h.scalerCaches, key) } + + return nil } func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) {