Skip to content

Commit

Permalink
fix: stop scale loop for pause Scaledbject (issue #4253) (#4550)
Browse files Browse the repository at this point in the history
Signed-off-by: Tobo Atchou <[email protected]>
  • Loading branch information
tobotg authored Jun 18, 2023
1 parent bce3375 commit 93851e7
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ New deprecation(s):
- **General**: Use default metrics provider from sigs.k8s.io/custom-metrics-apiserver ([#4473](https://github.com/kedacore/keda/pull/4473))
- **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906))
- **General**: Bump `kubernetes-sigs/controller-runtime` to v0.15.0 and code alignment ([#4582](https://github.com/kedacore/keda/pull/4582))
- **General**: Stop logging errors for paused ScaledObject (with "autoscaling.keda.sh/paused-replicas" annotation) by skipping reconciliation loop for the object (stop the scale loop and delete the HPA) ([#4253](https://github.com/kedacore/keda/pull/4253))

## v2.10.1

Expand Down
35 changes: 32 additions & 3 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ const (
ConditionActive ConditionType = "Active"
// ConditionFallback specifies that the resource has a fallback active.
ConditionFallback ConditionType = "Fallback"
// ConditionPaused specifies that the resource is paused.
ConditionPaused ConditionType = "Paused"
)

const (
// ScaledObjectConditionReadySucccesReason defines the default Reason for correct ScaledObject
ScaledObjectConditionReadySucccesReason = "ScaledObjectReady"
// ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject
ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling"
// ScaledObjectConditionPausedReason defines the default Reason for paused ScaledObject
ScaledObjectConditionPausedReason = "ScaledObjectPaused"
// ScaledObjectConditionPausedMessage defines the default Message for paused ScaledObject
ScaledObjectConditionPausedMessage = "ScaledObject is paused"
)

// Condition to store the condition state
Expand Down Expand Up @@ -70,6 +76,7 @@ func (c *Conditions) AreInitialized() bool {
foundReady := false
foundActive := false
foundFallback := false
foundPaused := false
if *c != nil {
for _, condition := range *c {
if condition.Type == ConditionReady {
Expand All @@ -89,14 +96,20 @@ func (c *Conditions) AreInitialized() bool {
break
}
}
for _, condition := range *c {
if condition.Type == ConditionPaused {
foundPaused = true
break
}
}
}

return foundReady && foundActive && foundFallback
return foundReady && foundActive && foundFallback && foundPaused
}

// GetInitializedConditions returns Conditions initialized to the default -> Status: Unknown
func GetInitializedConditions() *Conditions {
return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}}
return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}, {Type: ConditionPaused, Status: metav1.ConditionUnknown}}
}

// IsTrue is true if the condition is True
Expand Down Expand Up @@ -147,6 +160,14 @@ func (c *Conditions) SetFallbackCondition(status metav1.ConditionStatus, reason
c.setCondition(ConditionFallback, status, reason, message)
}

// SetPausedCondition modifies Paused Condition according to input parameters
func (c *Conditions) SetPausedCondition(status metav1.ConditionStatus, reason string, message string) {
if *c == nil {
c = GetInitializedConditions()
}
c.setCondition(ConditionPaused, status, reason, message)
}

// GetActiveCondition returns Condition of type Active
func (c *Conditions) GetActiveCondition() Condition {
if *c == nil {
Expand All @@ -163,14 +184,22 @@ func (c *Conditions) GetReadyCondition() Condition {
return c.getCondition(ConditionReady)
}

// GetFallbackCondition returns Condition of type Ready
// GetFallbackCondition returns Condition of type Fallback
func (c *Conditions) GetFallbackCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionFallback)
}

// GetPausedCondition returns Condition of type Paused
func (c *Conditions) GetPausedCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionPaused)
}

func (c Conditions) getCondition(conditionType ConditionType) Condition {
for i := range c {
if c[i].Type == conditionType {
Expand Down
1 change: 1 addition & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status"
// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status"
// +kubebuilder:printcolumn:name="Fallback",type="string",JSONPath=".status.conditions[?(@.type==\"Fallback\")].status"
// +kubebuilder:printcolumn:name="Paused",type="string",JSONPath=".status.conditions[?(@.type==\"Paused\")].status"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// ScaledObject is a specification for a ScaledObject resource
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ spec:
- jsonPath: .status.conditions[?(@.type=="Fallback")].status
name: Fallback
type: string
- jsonPath: .status.conditions[?(@.type=="Paused")].status
name: Paused
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down
12 changes: 10 additions & 2 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,21 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l

// deleteAndCreateHpa delete old HPA and create new one
func (r *ScaledObjectReconciler) renameHPA(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2.HorizontalPodAutoscaler, gvkr *kedav1alpha1.GroupVersionKindResource) error {
logger.Info("Deleting old HPA", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", foundHpa.Name)
if err := r.deleteHPA(ctx, logger, scaledObject, foundHpa); err != nil {
return err
}
return r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
}

// deleteHpa delete existing HPA
func (r *ScaledObjectReconciler) deleteHPA(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2.HorizontalPodAutoscaler) error {
logger.Info("Deleting existing HPA", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", foundHpa.Name)
if err := r.Client.Delete(ctx, foundHpa); err != nil {
logger.Error(err, "Failed to delete old HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
}

return r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
return nil
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
Expand Down
94 changes: 72 additions & 22 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get ScaledObject")
reqLogger.Error(err, "failed to get ScaledObject")
return ctrl.Result{}, err
}

Expand All @@ -172,9 +172,9 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

// reconcile ScaledObject and set status appropriately
msg, err := r.reconcileScaledObject(ctx, reqLogger, scaledObject)
conditions := scaledObject.Status.Conditions.DeepCopy()
// reconcile ScaledObject and set status appropriately
msg, err := r.reconcileScaledObject(ctx, reqLogger, scaledObject, &conditions)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
Expand All @@ -197,7 +197,31 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// reconcileScaledObject implements reconciler logic for ScaledObject
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) {
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
_, pausedAnnotationFound := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
if pausedAnnotationFound {
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}
if scaledObject.Status.PausedReplicaCount != nil {
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
msg = "failed to stop the scale loop for paused ScaledObject"
return msg, err
}
if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted {
msg = "failed to delete HPA for paused ScaledObject"
return msg, err
}
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg)
return msg, nil
}
} else if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject")
}

// Check scale target Name is specified
if scaledObject.Spec.ScaleTargetRef.Name == "" {
err := fmt.Errorf("ScaledObject.spec.scaleTargetRef.name is missing")
Expand All @@ -207,7 +231,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
// Check the label needed for Metrics servers is present on ScaledObject
err := r.ensureScaledObjectLabel(ctx, logger, scaledObject)
if err != nil {
return "Failed to update ScaledObject with scaledObjectName label", err
return "failed to update ScaledObject with scaledObjectName label", err
}

// Check if resource targeted for scaling exists and exposes /scale subresource
Expand All @@ -229,7 +253,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
// Create a new HPA or update existing one according to ScaledObject
newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr)
if err != nil {
return "Failed to ensure HPA is correctly created for ScaledObject", err
return "failed to ensure HPA is correctly created for ScaledObject", err
}
scaleObjectSpecChanged := false
if !newHPACreated {
Expand All @@ -238,17 +262,21 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
// (we can omit this check if a new HPA was created, which fires new ScaleLoop anyway)
scaleObjectSpecChanged, err = r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
return "Failed to check whether ScaledObject's Generation was changed", err
return "failed to check whether ScaledObject's Generation was changed", err
}
}

// Notify ScaleHandler if a new HPA was created or if ScaledObject was updated
if newHPACreated || scaleObjectSpecChanged {
if r.requestScaleLoop(ctx, logger, scaledObject) != nil {
return "Failed to start a new scale loop with scaling logic", err
return "failed to start a new scale loop with scaling logic", err
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}

if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
}
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}

Expand All @@ -273,7 +301,7 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
return gvkr, err
}
gvkString := gvkr.GVKString()
Expand All @@ -299,11 +327,11 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
logger.Error(err, "target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
logger.Error(errScale, "target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
}
isScalableCache.Store(gr.String(), true)
Expand Down Expand Up @@ -395,12 +423,7 @@ func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *k

// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
var hpaName string
if scaledObject.Status.HpaName != "" {
hpaName = scaledObject.Status.HpaName
} else {
hpaName = getHPAName(scaledObject)
}
hpaName := getHPANameOnEnsure(scaledObject)
foundHpa := &autoscalingv2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
Expand All @@ -414,7 +437,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
// new HPA created successfully -> notify Reconcile function so it could fire a new ScaleLoop
return true, nil
} else if err != nil {
logger.Error(err, "Failed to get HPA from cluster")
logger.Error(err, "failed to get HPA from cluster")
return false, err
}

Expand All @@ -431,13 +454,40 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
// HPA was found -> let's check if we need to update it
err = r.updateHPAIfNeeded(ctx, logger, scaledObject, foundHpa, gvkr)
if err != nil {
logger.Error(err, "Failed to check HPA for possible update")
logger.Error(err, "failed to check HPA for possible update")
return false, err
}

return false, nil
}

// ensureHPAForScaledObjectIsDeleted ensures that in cluster any HPA for specified ScaledObject is deleted, returns true if no HPA exists
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectIsDeleted(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
hpaName := getHPANameOnEnsure(scaledObject)
foundHpa := &autoscalingv2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
return true, nil
} else if err != nil {
logger.Error(err, "failed to get HPA from cluster")
return false, err
}

if err := r.deleteHPA(ctx, logger, scaledObject, foundHpa); err != nil {
logger.Error(err, "failed to delete HPA from cluster")
return false, err
}
return true, nil
}

func getHPANameOnEnsure(scaledObject *kedav1alpha1.ScaledObject) string {
if scaledObject.Status.HpaName != "" {
return scaledObject.Status.HpaName
}
return getHPAName(scaledObject)
}

func isHpaRenamed(scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2.HorizontalPodAutoscaler) bool {
// if HPA name defined in SO -> check if equals to the found HPA
if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig.Name != "" {
Expand All @@ -453,7 +503,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo

key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
logger.Error(err, "error getting key for scaledObject")
return err
}

Expand All @@ -471,7 +521,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo
func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
logger.Error(err, "error getting key for scaledObject")
return err
}

Expand All @@ -487,7 +537,7 @@ func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.
func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
logger.Error(err, "error getting key for scaledObject")
return true, err
}

Expand Down
Loading

0 comments on commit 93851e7

Please sign in to comment.