Skip to content

Commit

Permalink
fix: completed workflow tracking (#3)
Browse files Browse the repository at this point in the history
Problem: The workflow cache can have out of date information in it
which will resurrect a completed workflow. In this scenario, with an
on-exit handler with `when: "{{workflow.status}} != Succeeded"` it may
fire the exit handler erroneously.

Firing enough of this workflow into a real cluster fast enough will
reproduce this - tested in 3.4.8, 3.4.9, and master
```
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: exit-
spec:
  entrypoint: entrypoint
  onExit: exit-handler
  templates:
    - name: entrypoint
      container:
        image: quay.io/pch/whalesay:latest
        command: [ cowsay ]
        args: [ "hello world" ]
        resources:
          limits:
            memory: 32Mi
            cpu: 100m
    - name: ohnoes
      container:
        image: quay.io/pch/whalesay:latest
        command: [ cowsay ]
        args: [ "oh noes" ]
        resources:
          limits:
            memory: 32Mi
            cpu: 100m
    - name: exit-handler
      steps:
        - - name: ohnoes
            when: "{{workflow.status}} != Succeeded"
            template: ohnoes
```

It does not reproduce easily on a toy cluster in k3d. I believe
because API latencies are too small and the whole machine starts to
fall apart before the problem happens.

It happens rarely in real workloads but if `ohnoes` is replaced by a
notification you'll receive a notification that your workflow has
failed, even though it has succeeded when you go and look at it.

Story: The workflow controller is busy, and the k8s API is busy. The
k8s API is providing workflow updates which run through the workflow
informer cache. Normally these cannot go 'backwards'.

Backwards would be where an update to the workflow takes our view of
the workflow back to an earlier point in time because we've received a
high latency update from the k8s API. We prevent many of these using
the UpdateFunc handler which stops us reconciling at the point we've
already seen.

The problem is that as a workflow completes we drop it from the
informer cache (it no longer meets `reconciliationNeeded`). This
allows it to come back into the cache in an old form as a new
resource.

If this happens we attempt to reconcile it. In the event the pod
representing the node that was running in the form of the reappearing
workflow doesn't exist we're going to (correctly) treat the workflow
as in error.

So we have a workflow which is in error exiting, and test the onExit
`when` clause, decide it's true, and launch the pod to handle
it (ohnoes in my example). This runs, regardless of the fact that the
workflow controller then discovers that the workflow resourceVersion
is out of date and cannot write the current state back to the cluster.

Fixes considered:
* Attempt to ensure we're up to date before actioning the onExit. This
seemed like a very convoluted code path for doing special onExit
handling and likely to introduce more bugs than it fixed. This would
either put much more load on the API by causing the controller to get
the latest version, or attempt the write back to see if we were out of
date.
* Attempt to prevent old version re-entry in the cache rather than at
the reconcile stage. I tried this, but we still have a hole where
it'll make it through.

So the fix is to separately track workflows that have been evicted
from the cache and not process them at all. This cache has a lifetime
of 10 minutes as I could get up to latencies of a few minutes during
heavy workload testing.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Alan Clucas authored and GitHub Enterprise committed Nov 16, 2023
1 parent 1a34188 commit 96bbe0b
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
75 changes: 74 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
gosync "sync"
"syscall"
"time"

Expand Down Expand Up @@ -66,6 +67,18 @@ import (
plugin "github.com/argoproj/argo-workflows/v3/workflow/util/plugins"
)

const maxAllowedStackDepth = 100

type recentlyCompletedWorkflow struct {
key string
when time.Time
}

type recentCompletions struct {
completions []recentlyCompletedWorkflow
mutex gosync.Mutex
}

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -128,6 +141,8 @@ type WorkflowController struct {
// Default is 3s and can be configured using the env var ARGO_PROGRESS_FILE_TICK_DURATION
progressFileTickDuration time.Duration
executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin

recentCompletions recentCompletions
}

const (
Expand Down Expand Up @@ -735,6 +750,11 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

if wfc.checkRecentlyCompleted(wf.ObjectMeta.Name) {
log.WithFields(log.Fields{"name": wf.ObjectMeta.Name}).Warn("Cache: Rejecting recently deleted")
return true
}

// this will ensure we process every incomplete workflow once every 20m
wfc.wfQueue.AddAfter(key, workflowResyncPeriod)

Expand Down Expand Up @@ -826,6 +846,52 @@ func getWfPriority(obj interface{}) (int32, time.Time) {
return int32(priority), un.GetCreationTimestamp().Time
}

// 10 minutes in the past
const maxCompletedStoreTime = time.Second * -600

func (wfc *WorkflowController) cleanCompletedWorkflowsRecord() {
cutoff := time.Now().Add(maxCompletedStoreTime)
removeIndex := -1
wfc.recentCompletions.mutex.Lock()
defer wfc.recentCompletions.mutex.Unlock()

for i, val := range wfc.recentCompletions.completions {
if val.when.After(cutoff) {
removeIndex = i - 1
break
}
}
if removeIndex >= 0 {
wfc.recentCompletions.completions = wfc.recentCompletions.completions[removeIndex+1:]
}
}

func (wfc *WorkflowController) recordCompletedWorkflow(key string) {
if !wfc.checkRecentlyCompleted(key) {
wfc.recentCompletions.mutex.Lock()
wfc.recentCompletions.completions = append(wfc.recentCompletions.completions,
recentlyCompletedWorkflow{
key: key,
when: time.Now(),
})
wfc.recentCompletions.mutex.Unlock()
}
}

func (wfc *WorkflowController) checkRecentlyCompleted(key string) bool {
wfc.cleanCompletedWorkflowsRecord()
recent := false
wfc.recentCompletions.mutex.Lock()
for _, val := range wfc.recentCompletions.completions {
if val.key == key {
recent = true
break
}
}
wfc.recentCompletions.mutex.Unlock()
return recent
}

func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) {
wfc.wfInformer.AddEventHandler(
cache.FilteringResourceEventHandler{
Expand All @@ -835,7 +901,13 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
log.Warnf("Workflow FilterFunc: '%v' is not an unstructured", obj)
return false
}
return reconciliationNeeded(un)
needed := reconciliationNeeded(un)
if !needed {
key, _ := cache.MetaNamespaceKeyFunc(un)
wfc.recordCompletedWorkflow(key)
wfc.throttler.Remove(key)
}
return needed
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -866,6 +938,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.releaseAllWorkflowLocks(obj)
wfc.recordCompletedWorkflow(key)
// no need to add to the queue - this workflow is done
wfc.throttler.Remove(key)
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}()

woc.log.Info("Processing workflow")
woc.log.WithFields(log.Fields{"Phase": woc.wf.Status.Phase, "ResourceVersion": woc.wf.ObjectMeta.ResourceVersion}).Info("Processing workflow")

// Set the Execute workflow spec for execution
// ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault
Expand Down

0 comments on commit 96bbe0b

Please sign in to comment.