Skip to content

Commit

Permalink
Adding Scaling Strategy for ScaledJob (kedacore#1227)
Browse files Browse the repository at this point in the history
Signed-off-by: Tsuyoshi Ushio <[email protected]>
  • Loading branch information
TsuyoshiUshio authored and silenceper committed Oct 10, 2020
1 parent 02a901d commit 9bd6e7e
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 7 deletions.
15 changes: 14 additions & 1 deletion api/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type ScaledJobSpec struct {
// +optional
EnvSourceContainerName string `json:"envSourceContainerName,omitempty"`
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
// +optional
ScalingStrategy ScalingStrategy `json:"scalingStrategy,omitempty"`
Triggers []ScaleTriggers `json:"triggers"`
}

Expand All @@ -57,6 +59,17 @@ type ScaledJobList struct {
Items []ScaledJob `json:"items"`
}

// ScalingStrategy defines the strategy of Scaling
// +optional
type ScalingStrategy struct {
// +optional
Strategy string `json:"strategy,omitempty"`
// +optional
CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"`
// +optional
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
}

func init() {
SchemeBuilder.Register(&ScaledJob{}, &ScaledJobList{})
}
Expand Down
21 changes: 21 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6343,6 +6343,17 @@ spec:
pollingInterval:
format: int32
type: integer
scalingStrategy:
description: ScalingStrategy defines the strategy of Scaling
properties:
customScalingQueueLengthDeduction:
format: int32
type: integer
customScalingRunningJobPercentage:
type: string
strategy:
type: string
type: object
successfulJobsHistoryLimit:
format: int32
type: integer
Expand Down
73 changes: 67 additions & 6 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executor
import (
"context"
"sort"
"strconv"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -12,6 +13,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/kedacore/keda/api/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/api/v1alpha1"
version "github.com/kedacore/keda/version"
)
Expand All @@ -27,12 +29,7 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
runningJobCount := e.getRunningJobCount(scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)

var effectiveMaxScale int64
if (maxScale + runningJobCount) > scaledJob.MaxReplicaCount() {
effectiveMaxScale = scaledJob.MaxReplicaCount() - runningJobCount
} else {
effectiveMaxScale = maxScale
}
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, scaledJob.MaxReplicaCount())

if effectiveMaxScale < 0 {
effectiveMaxScale = 0
Expand Down Expand Up @@ -227,3 +224,67 @@ func (e *scaleExecutor) getFinishedJobConditionType(j *batchv1.Job) batchv1.JobC
}
return ""
}

// NewScalingStrategy returns ScalingStrategy instance
func NewScalingStrategy(logger logr.Logger, scaledJob *v1alpha1.ScaledJob) ScalingStrategy {
switch scaledJob.Spec.ScalingStrategy.Strategy {
case "custom":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected:", "custom", "customScalingQueueLength", scaledJob.Spec.ScalingStrategy.CustomScalingQueueLengthDeduction, "customScallingRunningJobPercentage", scaledJob.Spec.ScalingStrategy.CustomScalingRunningJobPercentage)
var err error
if percentage, err := strconv.ParseFloat(scaledJob.Spec.ScalingStrategy.CustomScalingRunningJobPercentage, 64); err == nil {
return customScalingStrategy{
CustomScalingQueueLengthDeduction: scaledJob.Spec.ScalingStrategy.CustomScalingQueueLengthDeduction,
CustomScalingRunningJobPercentage: &percentage,
}
}

logger.V(1).Info("Fail to convert CustomScalingRunningJobPercentage into float", "error", err, "CustomScalingRunningJobPercentage", scaledJob.Spec.ScalingStrategy.CustomScalingRunningJobPercentage)
logger.V(1).Info("Selecting Scale has been changed", "selected", "default")
return defaultScalingStrategy{}

case "accurate":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "accurate")
return accurateScalingStrategy{}
default:
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "default")
return defaultScalingStrategy{}
}
}

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
return maxScale - runningJobCount
}

type customScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
}
return maxScale
}

func min(x, y int64) int64 {
if x > y {
return y
}
return x
}
95 changes: 95 additions & 0 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,67 @@ func TestCleanUpNormalCase(t *testing.T) {
assert.True(t, ok)
}

func TestNewNewScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", int32(10), "0"))
assert.Equal(t, "executor.customScalingStrategy", fmt.Sprintf("%T", strategy))
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", int32(0), "0"))
assert.Equal(t, "executor.accurateScalingStrategy", fmt.Sprintf("%T", strategy))
strategy = NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("default", "default", int32(0), "0"))
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
}

func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))
}

func TestCustomScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
customScalingQueueLengthDeduction := int32(1)
customScalingRunningJobPercentage := "0.5"
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 10))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
customScalingRunningJobPercentage = ""
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))

// Set 0 as customScalingRunningJobPercentage
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 4))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 5))
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -151,6 +212,40 @@ func getMockScaledJobWithDefault() *kedav1alpha1.ScaledJob {
return scaledJob
}

func getMockScaledJobWithStrategy(name, scalingStrategy string, customScalingQueueLengthDeduction int32, customScalingRunningJobPercentage string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{
ScalingStrategy: kedav1alpha1.ScalingStrategy{
Strategy: scalingStrategy,
CustomScalingQueueLengthDeduction: &customScalingQueueLengthDeduction,
CustomScalingRunningJobPercentage: customScalingRunningJobPercentage,
},
},
}
scaledJob.ObjectMeta.Name = name
return scaledJob
}

func getMockScaledJobWithCustomStrategyWithNilParameter(name, scalingStrategy string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{
ScalingStrategy: kedav1alpha1.ScalingStrategy{
Strategy: scalingStrategy,
},
},
}
scaledJob.ObjectMeta.Name = name
return scaledJob
}

func getMockScaledJobWithDefaultStrategy(name string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{},
}
scaledJob.ObjectMeta.Name = name
return scaledJob
}

func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParameter, deletedJobName *map[string]string) *mock_client.MockClient {
client := mock_client.NewMockClient(ctrl)
client.EXPECT().
Expand Down

0 comments on commit 9bd6e7e

Please sign in to comment.