diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 274e5659c6d2f..9dad795b85414 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + gosync "sync" "syscall" "time" @@ -66,6 +67,18 @@ import ( plugin "github.com/argoproj/argo-workflows/v3/workflow/util/plugins" ) +const maxAllowedStackDepth = 100 + +type recentlyCompletedWorkflow struct { + key string + when time.Time +} + +type recentCompletions struct { + completions []recentlyCompletedWorkflow + mutex gosync.Mutex +} + // WorkflowController is the controller for workflow resources type WorkflowController struct { // namespace of the workflow controller @@ -128,6 +141,8 @@ type WorkflowController struct { // Default is 3s and can be configured using the env var ARGO_PROGRESS_FILE_TICK_DURATION progressFileTickDuration time.Duration executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin + + recentCompletions recentCompletions } const ( @@ -735,6 +750,11 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wfc.checkRecentlyCompleted(wf.ObjectMeta.Name) { + log.WithFields(log.Fields{"name": wf.ObjectMeta.Name}).Warn("Cache: Rejecting recently deleted") + return true + } + // this will ensure we process every incomplete workflow once every 20m wfc.wfQueue.AddAfter(key, workflowResyncPeriod) @@ -826,6 +846,52 @@ func getWfPriority(obj interface{}) (int32, time.Time) { return int32(priority), un.GetCreationTimestamp().Time } +// 10 minutes in the past +const maxCompletedStoreTime = time.Second * -600 + +func (wfc *WorkflowController) cleanCompletedWorkflowsRecord() { + cutoff := time.Now().Add(maxCompletedStoreTime) + removeIndex := -1 + wfc.recentCompletions.mutex.Lock() + defer wfc.recentCompletions.mutex.Unlock() + + for i, val := range wfc.recentCompletions.completions { + if val.when.After(cutoff) { + removeIndex = i - 1 + break + } + } + if removeIndex >= 0 { + wfc.recentCompletions.completions = wfc.recentCompletions.completions[removeIndex+1:] + } +} + +func (wfc *WorkflowController) recordCompletedWorkflow(key string) { + if !wfc.checkRecentlyCompleted(key) { + wfc.recentCompletions.mutex.Lock() + wfc.recentCompletions.completions = append(wfc.recentCompletions.completions, + recentlyCompletedWorkflow{ + key: key, + when: time.Now(), + }) + wfc.recentCompletions.mutex.Unlock() + } +} + +func (wfc *WorkflowController) checkRecentlyCompleted(key string) bool { + wfc.cleanCompletedWorkflowsRecord() + recent := false + wfc.recentCompletions.mutex.Lock() + for _, val := range wfc.recentCompletions.completions { + if val.key == key { + recent = true + break + } + } + wfc.recentCompletions.mutex.Unlock() + return recent +} + func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) { wfc.wfInformer.AddEventHandler( cache.FilteringResourceEventHandler{ @@ -835,7 +901,13 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) log.Warnf("Workflow FilterFunc: '%v' is not an unstructured", obj) return false } - return reconciliationNeeded(un) + needed := reconciliationNeeded(un) + if !needed { + key, _ := cache.MetaNamespaceKeyFunc(un) + wfc.recordCompletedWorkflow(key) + wfc.throttler.Remove(key) + } + return needed }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -866,6 +938,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { wfc.releaseAllWorkflowLocks(obj) + wfc.recordCompletedWorkflow(key) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 74c3ee7adba0a..8bbfe6dc04a5e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -200,7 +200,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } }() - woc.log.Info("Processing workflow") + woc.log.WithFields(log.Fields{"Phase": woc.wf.Status.Phase, "ResourceVersion": woc.wf.ObjectMeta.ResourceVersion}).Info("Processing workflow") // Set the Execute workflow spec for execution // ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault