diff --git a/CHANGELOG.md b/CHANGELOG.md index 83515cf4631..c1552b4431b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,17 @@ Here is an overview of all new **experimental** features: ### Improvements - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114)) +- **General**: Do not delete running Jobs on KEDA restart ([#5656](https://github.com/kedacore/keda/issues/5656)) +- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478)) +- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802)) +- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896)) +- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) +- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) +- **IBM MQ Scaler**: Add TLS support for IBM MQ scaler ([#5974](https://github.com/kedacore/keda/issues/5974)) +- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) +- **MYSQL Scaler**: Add support to fetch username from env ([#5883](https://github.com/kedacore/keda/issues/5883)) +- **Postgres Scaler**: Add support for access token authentication to an Azure Postgres Flexible Server ([#5823](https://github.com/kedacore/keda/issues/5823)) ### Fixes diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 98c1ce87cc8..845ba5aca90 100755 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -279,22 +279,36 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context return "Cannot get list of Jobs owned by this scaledJob", err } - if len(jobs.Items) > 0 { - logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobs.Items)) + jobIndexes := make([]int, 0, len(jobs.Items)) + scaledJobGeneration := strconv.FormatInt(scaledJob.Generation, 10) + for i, job := range jobs.Items { + if jobGen, ok := job.Annotations["scaledjob.keda.sh/generation"]; !ok { + // delete Jobs that don't have the generation annotation + jobIndexes = append(jobIndexes, i) + } else if jobGen != scaledJobGeneration { + // delete Jobs that have a different generation annotation + jobIndexes = append(jobIndexes, i) + } } - for _, job := range jobs.Items { - job := job - propagationPolicy := metav1.DeletePropagationBackground - if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" { - propagationPolicy = metav1.DeletePropagationForeground - } - err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy)) - if err != nil { - return "Not able to delete job: " + job.Name, err + if len(jobIndexes) == 0 { + logger.Info("RolloutStrategy: immediate, No jobs owned by the previous version of the scaledJob") + } else { + logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobIndexes)) + for _, index := range jobIndexes { + job := jobs.Items[index] + + propagationPolicy := metav1.DeletePropagationBackground + if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" { + propagationPolicy = metav1.DeletePropagationForeground + } + err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy)) + if err != nil { + return "Not able to delete job: " + job.Name, err + } } + return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobIndexes)), nil } - return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil } return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil } diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 1e9407284ae..958b952e05b 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -114,6 +114,10 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru } func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) { + if maxScale <= 0 { + logger.Info("No need to create jobs - all requested jobs already exist", "jobs", maxScale) + return + } logger.Info("Creating jobs", "Effective number of max jobs", maxScale) if scaleTo > maxScale { scaleTo = maxScale @@ -150,6 +154,13 @@ func (e *scaleExecutor) generateJobs(logger logr.Logger, scaledJob *kedav1alpha1 labels[key] = value } + annotations := map[string]string{ + "scaledjob.keda.sh/generation": strconv.FormatInt(scaledJob.Generation, 10), + } + for key, value := range scaledJob.ObjectMeta.Annotations { + annotations[key] = value + } + jobs := make([]*batchv1.Job, int(scaleTo)) for i := 0; i < int(scaleTo); i++ { job := &batchv1.Job{ @@ -157,7 +168,7 @@ func (e *scaleExecutor) generateJobs(logger logr.Logger, scaledJob *kedav1alpha1 GenerateName: scaledJob.GetName() + "-", Namespace: scaledJob.GetNamespace(), Labels: labels, - Annotations: scaledJob.ObjectMeta.Annotations, + Annotations: annotations, }, Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(), } diff --git a/pkg/scaling/executor/scale_jobs_test.go b/pkg/scaling/executor/scale_jobs_test.go index 7542c1acb58..545a26583c4 100644 --- a/pkg/scaling/executor/scale_jobs_test.go +++ b/pkg/scaling/executor/scale_jobs_test.go @@ -316,8 +316,11 @@ func TestCreateJobs(t *testing.T) { func TestGenerateJobs(t *testing.T) { var ( - expectedAnnotations = map[string]string{"test": "test"} - expectedLabels = map[string]string{ + expectedAnnotations = map[string]string{ + "test": "test", + "scaledjob.keda.sh/generation": "0", + } + expectedLabels = map[string]string{ "app.kubernetes.io/managed-by": "keda-operator", "app.kubernetes.io/name": "test", "app.kubernetes.io/part-of": "test",