Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScaledJob: introduce rolloutStrategy #2164

Merged
merged 14 commits into from
Nov 1, 2021
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
### New

- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- ScaledJob: introduce `RolloutStrategy` ([#2164](https://github.com/kedacore/keda/pull/2164))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))

Expand Down
1 change: 0 additions & 1 deletion adapter/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ScaledJobSpec struct {
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
// +optional
RolloutStrategy string `json:"rolloutStrategy,omitempty"`
// +optional
EnvSourceContainerName string `json:"envSourceContainerName,omitempty"`
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
Expand Down
1 change: 0 additions & 1 deletion apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
creationTimestamp: null
name: scaledjobs.keda.sh
spec:
Expand Down Expand Up @@ -7354,6 +7354,8 @@ spec:
pollingInterval:
format: int32
type: integer
rolloutStrategy:
type: string
scalingStrategy:
description: ScalingStrategy defines the strategy of Scaling
properties:
Expand Down
4 changes: 3 additions & 1 deletion config/crd/bases/keda.sh_triggerauthentications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
name: triggerauthentications.keda.sh
spec:
Expand Down Expand Up @@ -89,6 +89,8 @@ spec:
type: object
mount:
type: string
namespace:
type: string
role:
type: string
secrets:
Expand Down
49 changes: 27 additions & 22 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
msg, err := r.deletePreviousVersionScaleJobs(ctx, logger, scaledJob)
if err != nil {
return msg, err
}
Expand All @@ -155,30 +155,35 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
return "ScaledJob is defined correctly and is ready to scaling", nil
}

// Delete Jobs owned by the previous version of the scaledJob
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob.keda.sh/name": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.Client.List(context.TODO(), jobs, opts...)
if err != nil {
return "Cannot get list of Jobs owned by this scaledJob", err
}

if jobs.Size() > 0 {
logger.Info("Deleting jobs owned by the previous version of the scaledJob", "Number of jobs to delete", jobs.Size())
}
for _, job := range jobs.Items {
job := job
err = r.Client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
// Delete Jobs owned by the previous version of the scaledJob based on the rolloutStrategy given for this scaledJob, if any
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
switch scaledJob.Spec.RolloutStrategy {
case "gradual":
logger.Info("RolloutStrategy: gradual, Not deleting jobs owned by the previous version of the scaleJob")
default:
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob.keda.sh/name": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.Client.List(ctx, jobs, opts...)
if err != nil {
return "Not able to delete job: " + job.Name, err
return "Cannot get list of Jobs owned by this scaledJob", err
}
}

return fmt.Sprintf("Deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", jobs.Size()), nil
if len(jobs.Items) > 0 {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobs.Items))
}
for _, job := range jobs.Items {
job := job
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil
}
return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil
}

// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
Expand Down