Skip to content

Commit

Permalink
infinate backoff for create job
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR committed Jan 7, 2025
1 parent 6e12192 commit b19b01b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 37 deletions.
9 changes: 1 addition & 8 deletions pkg/controller/compact_status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,8 @@ func (r *CompactStatusUpdater) OnCreateJob(ctx context.Context, compact *v1alpha
if err != nil {
newStatus.State = string(v1alpha1.BackupRetryTheFailed)
newStatus.Message = err.Error()
newStatus.RetryStatus = []v1alpha1.CompactRetryRecord{
{
RetryNum: len(compact.Status.RetryStatus),
DetectFailedAt: metav1.NewTime(time.Now()),
RetryReason: err.Error(),
},
}
} else {
newStatus.State = string(v1alpha1.BackupRunning)
newStatus.State = string(v1alpha1.BackupPrepare)
}
return r.UpdateStatus(compact, newStatus)
}
Expand Down
63 changes: 34 additions & 29 deletions pkg/controller/compactbackup/compact_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import (
"k8s.io/utils/ptr"
)

const (
maxInterval = 6 * time.Minute
)

// Controller controls backup.
type Controller struct {
deps *controller.Dependencies
Expand Down Expand Up @@ -281,21 +285,21 @@ func (c *Controller) sync(key string) (err error) {
return nil
}

ok, err := c.precheckCompact(compact)
ok, err := c.checkJobStatus(compact)
if err != nil {
return err
}
if !ok {
klog.Infof("Compact %s/%s is not allowed, skip", ns, name)
klog.Infof("Compact %s/%s is not allowed to create job, skip", ns, name)
return nil
}

err = c.doCompact(compact.DeepCopy())
err = c.createJob(compact.DeepCopy())
c.statusUpdater.OnCreateJob(context.TODO(), compact, err)
return err
}

func (c *Controller) doCompact(compact *v1alpha1.CompactBackup) error {
func (c *Controller) createJob(compact *v1alpha1.CompactBackup) error {
ns := compact.GetNamespace()
name := compact.GetName()
compactJobName := compact.GetName()
Expand Down Expand Up @@ -474,17 +478,34 @@ func (c *Controller) makeCompactJob(compact *v1alpha1.CompactBackup) (*batchv1.J
return job, "", nil
}

// precheckCompact checks if doCompact is allowed to run
func (c *Controller) validate(compact *v1alpha1.CompactBackup) error {
spec := compact.Spec
if spec.StartTs == "" {
return errors.NewNoStackError("start-ts must be set")
}
if spec.EndTs == "" {
return errors.NewNoStackError("end-ts must be set")
}
if spec.Concurrency <= 0 {
return errors.NewNoStackError("concurrency must be greater than 0")
}
if spec.MaxRetryTimes < 0 {
return errors.NewNoStackError("maxRetryTimes must be greater than or equal to 0")
}
return nil
}

// checkJobStatus checks if doCompact is allowed to run
// Only if there is no other compact job existing, doCompact is allowed
// If the existing job failed, update compact status
func (c *Controller) precheckCompact(compact *v1alpha1.CompactBackup) (bool, error) {
func (c *Controller) checkJobStatus(compact *v1alpha1.CompactBackup) (bool, error) {
ns := compact.GetNamespace()
name := compact.GetName()

job, err := c.deps.KubeClientset.BatchV1().Jobs(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return c.allowCompact(compact), nil
return c.allowCreateJob(compact), nil
}
klog.Errorf("Failed to get job %s for compact %s/%s, error %v", name, ns, name, err)
return false, err
Expand All @@ -509,24 +530,7 @@ func (c *Controller) precheckCompact(compact *v1alpha1.CompactBackup) (bool, err
return false, nil
}

func (c *Controller) validate(compact *v1alpha1.CompactBackup) error {
spec := compact.Spec
if spec.StartTs == "" {
return errors.NewNoStackError("start-ts must be set")
}
if spec.EndTs == "" {
return errors.NewNoStackError("end-ts must be set")
}
if spec.Concurrency <= 0 {
return errors.NewNoStackError("concurrency must be greater than 0")
}
if spec.MaxRetryTimes < 0 {
return errors.NewNoStackError("maxRetryTimes must be greater than or equal to 0")
}
return nil
}

func (c *Controller) allowCompact(compact *v1alpha1.CompactBackup) bool {
func (c *Controller) allowCreateJob(compact *v1alpha1.CompactBackup) bool {
ns := compact.GetNamespace()
name := compact.GetName()

Expand All @@ -535,15 +539,16 @@ func (c *Controller) allowCompact(compact *v1alpha1.CompactBackup) bool {
if attempts <= 1 {
return 0
}
return 10 * time.Duration(math.Pow(10, float64(attempts-1))) * time.Second
interval := 10 * time.Duration(math.Pow(10, float64(attempts-1))) * time.Second
if interval > maxInterval {
return maxInterval
}
return interval
}

attempts := len(compact.Status.RetryStatus)
if attempts > 0 {
lastRetry := compact.Status.RetryStatus[attempts-1]
if lastRetry.RetryNum >= int(compact.Spec.MaxRetryTimes) {
return false
}
backoff := expBackoff(attempts)
if time.Since(lastRetry.DetectFailedAt.Time) < backoff {
klog.Infof("Compact: [%s/%s] backoff in effect, skipping retry.", ns, name)
Expand Down

0 comments on commit b19b01b

Please sign in to comment.