Skip to content

Commit

Permalink
Add defaultReplicaCount to spec. Scale to defaultReplicaCount when sc…
Browse files Browse the repository at this point in the history
…aler isActive returns an error.
  • Loading branch information
misha authored and misha committed Jun 7, 2021
1 parent 729c6cb commit 8aca785
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 6 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
// +kubebuilder:printcolumn:name="ScaleTargetName",type="string",JSONPath=".spec.scaleTargetRef.name"
// +kubebuilder:printcolumn:name="Min",type="integer",JSONPath=".spec.minReplicaCount"
// +kubebuilder:printcolumn:name="Max",type="integer",JSONPath=".spec.maxReplicaCount"
// +kubebuilder:printcolumn:name="Default",type="integer",JSONPath=".spec.defaultReplicaCount"
// +kubebuilder:printcolumn:name="Triggers",type="string",JSONPath=".spec.triggers[*].type"
// +kubebuilder:printcolumn:name="Authentication",type="string",JSONPath=".spec.triggers[*].authenticationRef.name"
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status"
Expand Down Expand Up @@ -41,6 +42,8 @@ type ScaledObjectSpec struct {
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
// +optional
DefaultReplicaCount *int32 `json:"defaultReplicaCount,omitempty"`
// +optional
Advanced *AdvancedConfig `json:"advanced,omitempty"`

Triggers []ScaleTriggers `json:"triggers"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ spec:
- jsonPath: .spec.maxReplicaCount
name: Max
type: integer
- jsonPath: .spec.defaultReplicaCount
name: Default
type: integer
- jsonPath: .spec.triggers[*].type
name: Triggers
type: string
Expand Down Expand Up @@ -200,6 +203,9 @@ spec:
cooldownPeriod:
format: int32
type: integer
defaultReplicaCount:
format: int32
type: integer
maxReplicaCount:
format: int32
type: integer
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// ScaleExecutor contains methods RequestJobScale and RequestScale
type ScaleExecutor interface {
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool)
}

type scaleExecutor struct {
Expand Down
17 changes: 16 additions & 1 deletion pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
)

func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool) {
func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) {
logger := e.logger.WithValues("scaledobject.Name", scaledObject.Name,
"scaledObject.Namespace", scaledObject.Namespace,
"scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)
Expand Down Expand Up @@ -59,6 +59,21 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
// current replica count is 0, but there is an active trigger.
// scale the ScaleTarget up
e.scaleFromZero(ctx, logger, scaledObject, currentScale)
case !isActive &&
isError &&
scaledObject.Spec.DefaultReplicaCount != nil &&
*scaledObject.Spec.DefaultReplicaCount != 0:
// there are no active triggers, but a scaler responded with an error
// AND
// there is a default replica count defined

// Scale to the default replica count
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *scaledObject.Spec.DefaultReplicaCount)
if err == nil {
logger.Info("Successfully set ScaleTarget replicas count to ScaledObject defaultReplicaCount",
"Original Replicas Count", currentReplicas,
"New Replicas Count", *scaledObject.Spec.DefaultReplicaCount)
}
case !isActive &&
currentReplicas > 0 &&
(scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0):
Expand Down
11 changes: 7 additions & 4 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav
scalingMutex.Lock()
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
h.scaleExecutor.RequestScale(ctx, obj, active)
h.scaleExecutor.RequestScale(ctx, obj, active, false)
case *kedav1alpha1.ScaledJob:
h.logger.Info("Warning: External Push Scaler does not support ScaledJob", "object", scalableObject)
}
Expand All @@ -214,22 +214,25 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
defer scalingMutex.Unlock()
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj))
isActive, isError := h.checkScaledObjectScalers(ctx, scalers, obj)
h.scaleExecutor.RequestScale(ctx, obj, isActive, isError)
case *kedav1alpha1.ScaledJob:
scaledJob := scalableObject.(*kedav1alpha1.ScaledJob)
isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
}
}

func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool {
func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) (bool, bool) {
isActive := false
isError := false
for i, scaler := range scalers {
isTriggerActive, err := scaler.IsActive(ctx)
scaler.Close()

if err != nil {
h.logger.V(1).Info("Error getting scale decision", "Error", err)
isError = true
h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
} else if isTriggerActive {
Expand All @@ -244,7 +247,7 @@ func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []s
break
}
}
return isActive
return isActive, isError
}

func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
Expand Down
File renamed without changes.

0 comments on commit 8aca785

Please sign in to comment.