From 9bd6e7e0d3b19d4f8d6821d3d269c3c449352018 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 8 Oct 2020 00:31:24 -0700 Subject: [PATCH] Adding Scaling Strategy for ScaledJob (#1227) Signed-off-by: Tsuyoshi Ushio --- api/v1alpha1/scaledjob_types.go | 15 +++- api/v1alpha1/zz_generated.deepcopy.go | 21 ++++++ config/crd/bases/keda.sh_scaledjobs.yaml | 11 +++ pkg/scaling/executor/scale_jobs.go | 73 ++++++++++++++++-- pkg/scaling/executor/scale_jobs_test.go | 95 ++++++++++++++++++++++++ 5 files changed, 208 insertions(+), 7 deletions(-) diff --git a/api/v1alpha1/scaledjob_types.go b/api/v1alpha1/scaledjob_types.go index 94a94a7b0f1..4659567bf95 100644 --- a/api/v1alpha1/scaledjob_types.go +++ b/api/v1alpha1/scaledjob_types.go @@ -36,7 +36,9 @@ type ScaledJobSpec struct { // +optional EnvSourceContainerName string `json:"envSourceContainerName,omitempty"` // +optional - MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"` + MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"` + // +optional + ScalingStrategy ScalingStrategy `json:"scalingStrategy,omitempty"` Triggers []ScaleTriggers `json:"triggers"` } @@ -57,6 +59,17 @@ type ScaledJobList struct { Items []ScaledJob `json:"items"` } +// ScalingStrategy defines the strategy of Scaling +// +optional +type ScalingStrategy struct { + // +optional + Strategy string `json:"strategy,omitempty"` + // +optional + CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"` + // +optional + CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"` +} + func init() { SchemeBuilder.Register(&ScaledJob{}, &ScaledJobList{}) } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 594e1eef9a4..3ca8841e234 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -329,6 +329,7 @@ func (in *ScaledJobSpec) DeepCopyInto(out *ScaledJobSpec) { *out = new(int32) **out = **in } + in.ScalingStrategy.DeepCopyInto(&out.ScalingStrategy) if in.Triggers != nil { in, out := &in.Triggers, &out.Triggers *out = make([]ScaleTriggers, len(*in)) @@ -542,6 +543,26 @@ func (in *ScaledObjectStatus) DeepCopy() *ScaledObjectStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScalingStrategy) DeepCopyInto(out *ScalingStrategy) { + *out = *in + if in.CustomScalingQueueLengthDeduction != nil { + in, out := &in.CustomScalingQueueLengthDeduction, &out.CustomScalingQueueLengthDeduction + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScalingStrategy. +func (in *ScalingStrategy) DeepCopy() *ScalingStrategy { + if in == nil { + return nil + } + out := new(ScalingStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TriggerAuthentication) DeepCopyInto(out *TriggerAuthentication) { *out = *in diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index 2db4cb63474..4db5592cdc4 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -6343,6 +6343,17 @@ spec: pollingInterval: format: int32 type: integer + scalingStrategy: + description: ScalingStrategy defines the strategy of Scaling + properties: + customScalingQueueLengthDeduction: + format: int32 + type: integer + customScalingRunningJobPercentage: + type: string + strategy: + type: string + type: object successfulJobsHistoryLimit: format: int32 type: integer diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 93ce53c69aa..68d480e931c 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -3,6 +3,7 @@ package executor import ( "context" "sort" + "strconv" "github.com/go-logr/logr" batchv1 "k8s.io/api/batch/v1" @@ -12,6 +13,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "github.com/kedacore/keda/api/v1alpha1" kedav1alpha1 "github.com/kedacore/keda/api/v1alpha1" version "github.com/kedacore/keda/version" ) @@ -27,12 +29,7 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al runningJobCount := e.getRunningJobCount(scaledJob) logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount) - var effectiveMaxScale int64 - if (maxScale + runningJobCount) > scaledJob.MaxReplicaCount() { - effectiveMaxScale = scaledJob.MaxReplicaCount() - runningJobCount - } else { - effectiveMaxScale = maxScale - } + effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, scaledJob.MaxReplicaCount()) if effectiveMaxScale < 0 { effectiveMaxScale = 0 @@ -227,3 +224,67 @@ func (e *scaleExecutor) getFinishedJobConditionType(j *batchv1.Job) batchv1.JobC } return "" } + +// NewScalingStrategy returns ScalingStrategy instance +func NewScalingStrategy(logger logr.Logger, scaledJob *v1alpha1.ScaledJob) ScalingStrategy { + switch scaledJob.Spec.ScalingStrategy.Strategy { + case "custom": + logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected:", "custom", "customScalingQueueLength", scaledJob.Spec.ScalingStrategy.CustomScalingQueueLengthDeduction, "customScallingRunningJobPercentage", scaledJob.Spec.ScalingStrategy.CustomScalingRunningJobPercentage) + var err error + if percentage, err := strconv.ParseFloat(scaledJob.Spec.ScalingStrategy.CustomScalingRunningJobPercentage, 64); err == nil { + return customScalingStrategy{ + CustomScalingQueueLengthDeduction: scaledJob.Spec.ScalingStrategy.CustomScalingQueueLengthDeduction, + CustomScalingRunningJobPercentage: &percentage, + } + } + + logger.V(1).Info("Fail to convert CustomScalingRunningJobPercentage into float", "error", err, "CustomScalingRunningJobPercentage", scaledJob.Spec.ScalingStrategy.CustomScalingRunningJobPercentage) + logger.V(1).Info("Selecting Scale has been changed", "selected", "default") + return defaultScalingStrategy{} + + case "accurate": + logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "accurate") + return accurateScalingStrategy{} + default: + logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "default") + return defaultScalingStrategy{} + } +} + +// ScalingStrategy is an interface for switching scaling algorithm +type ScalingStrategy interface { + GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 +} + +type defaultScalingStrategy struct { +} + +func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 { + return maxScale - runningJobCount +} + +type customScalingStrategy struct { + CustomScalingQueueLengthDeduction *int32 + CustomScalingRunningJobPercentage *float64 +} + +func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 { + return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount) +} + +type accurateScalingStrategy struct { +} + +func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 { + if (maxScale + runningJobCount) > maxReplicaCount { + return maxReplicaCount - runningJobCount + } + return maxScale +} + +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} diff --git a/pkg/scaling/executor/scale_jobs_test.go b/pkg/scaling/executor/scale_jobs_test.go index 11f71ffc570..14906ba80ab 100644 --- a/pkg/scaling/executor/scale_jobs_test.go +++ b/pkg/scaling/executor/scale_jobs_test.go @@ -45,6 +45,67 @@ func TestCleanUpNormalCase(t *testing.T) { assert.True(t, ok) } +func TestNewNewScalingStrategy(t *testing.T) { + logger := logf.Log.WithName("ScaledJobTest") + strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", int32(10), "0")) + assert.Equal(t, "executor.customScalingStrategy", fmt.Sprintf("%T", strategy)) + strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", int32(0), "0")) + assert.Equal(t, "executor.accurateScalingStrategy", fmt.Sprintf("%T", strategy)) + strategy = NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default")) + assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy)) + strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("default", "default", int32(0), "0")) + assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy)) +} + +func TestDefaultScalingStrategy(t *testing.T) { + logger := logf.Log.WithName("ScaledJobTest") + strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default")) + // maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio + assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5)) + assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5)) +} + +func TestCustomScalingStrategy(t *testing.T) { + logger := logf.Log.WithName("ScaledJobTest") + customScalingQueueLengthDeduction := int32(1) + customScalingRunningJobPercentage := "0.5" + strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage)) + // maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio + assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5)) + assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 10)) + strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom")) + + // If you don't set the two parameters is the same behavior as DefaultStrategy + assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5)) + assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5)) + + // Empty String will be DefaultStrategy + customScalingQueueLengthDeduction = int32(1) + customScalingRunningJobPercentage = "" + strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage)) + assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy)) + + // Set 0 as customScalingRunningJobPercentage + customScalingQueueLengthDeduction = int32(2) + customScalingRunningJobPercentage = "0" + strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage)) + assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5)) + + // Exceed the MaxReplicaCount + customScalingQueueLengthDeduction = int32(-2) + customScalingRunningJobPercentage = "0" + strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage)) + assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 4)) +} + +func TestAccurateScalingStrategy(t *testing.T) { + logger := logf.Log.WithName("ScaledJobTest") + strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0")) + // maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio + assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 5)) + assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 5)) +} + func TestCleanUpMixedCaseWithSortByTime(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -151,6 +212,40 @@ func getMockScaledJobWithDefault() *kedav1alpha1.ScaledJob { return scaledJob } +func getMockScaledJobWithStrategy(name, scalingStrategy string, customScalingQueueLengthDeduction int32, customScalingRunningJobPercentage string) *kedav1alpha1.ScaledJob { + scaledJob := &kedav1alpha1.ScaledJob{ + Spec: kedav1alpha1.ScaledJobSpec{ + ScalingStrategy: kedav1alpha1.ScalingStrategy{ + Strategy: scalingStrategy, + CustomScalingQueueLengthDeduction: &customScalingQueueLengthDeduction, + CustomScalingRunningJobPercentage: customScalingRunningJobPercentage, + }, + }, + } + scaledJob.ObjectMeta.Name = name + return scaledJob +} + +func getMockScaledJobWithCustomStrategyWithNilParameter(name, scalingStrategy string) *kedav1alpha1.ScaledJob { + scaledJob := &kedav1alpha1.ScaledJob{ + Spec: kedav1alpha1.ScaledJobSpec{ + ScalingStrategy: kedav1alpha1.ScalingStrategy{ + Strategy: scalingStrategy, + }, + }, + } + scaledJob.ObjectMeta.Name = name + return scaledJob +} + +func getMockScaledJobWithDefaultStrategy(name string) *kedav1alpha1.ScaledJob { + scaledJob := &kedav1alpha1.ScaledJob{ + Spec: kedav1alpha1.ScaledJobSpec{}, + } + scaledJob.ObjectMeta.Name = name + return scaledJob +} + func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParameter, deletedJobName *map[string]string) *mock_client.MockClient { client := mock_client.NewMockClient(ctrl) client.EXPECT().