-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
Copy pathexec_control.go
106 lines (96 loc) · 3.82 KB
/
exec_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package controller
import (
"context"
"fmt"
"sync"
"time"
apiv1 "k8s.io/api/core/v1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)
// applyExecutionControl will ensure a pod's execution control annotation is up-to-date
// kills any pending and running pods when workflow has reached it's deadline
func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1.Pod, wfNodesLock *sync.RWMutex) {
if pod == nil {
return
}
nodeID := woc.nodeID(pod)
node := woc.wf.Status.Nodes[nodeID]
//node is already completed
if node.Fulfilled() {
return
}
switch pod.Status.Phase {
case apiv1.PodSucceeded, apiv1.PodFailed:
// Skip any pod which are already completed
return
case apiv1.PodPending, apiv1.PodRunning:
// Check if we are currently shutting down
if woc.GetShutdownStrategy().Enabled() {
// Only delete pods that are not part of an onExit handler if we are "Stopping" or all pods if we are "Terminating"
_, onExitPod := pod.Labels[common.LabelKeyOnExit]
if !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
woc.log.WithField("podName", pod.Name).
WithField("shutdownStrategy", woc.GetShutdownStrategy()).
Info("Terminating pod as part of workflow shutdown")
woc.controller.queuePodForCleanup(pod.Namespace, pod.Name, terminateContainers)
msg := fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy())
woc.handleExecutionControlError(nodeID, wfNodesLock, msg)
return
}
}
// Check if we are past the workflow deadline. If we are, and the pod is still pending
// then we should simply delete it and mark the pod as Failed
if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) {
// pods that are part of an onExit handler aren't subject to the deadline
_, onExitPod := pod.Labels[common.LabelKeyOnExit]
if !onExitPod {
woc.log.WithField("podName", pod.Name).
WithField(" workflowDeadline", woc.workflowDeadline).
Info("Terminating pod which has exceeded workflow deadline")
woc.controller.queuePodForCleanup(pod.Namespace, pod.Name, terminateContainers)
woc.handleExecutionControlError(nodeID, wfNodesLock, "Step exceeded its deadline")
return
}
}
}
if woc.GetShutdownStrategy().Enabled() {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
woc.log.WithField("podName", pod.Name).
Info("Terminating on-exit pod")
woc.controller.queuePodForCleanup(woc.wf.Namespace, pod.Name, terminateContainers)
}
}
}
// handleExecutionControlError marks a node as failed with an error message
func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLock *sync.RWMutex, errorMsg string) {
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
node := woc.wf.Status.Nodes[nodeID]
woc.markNodePhase(node.Name, wfv1.NodeFailed, errorMsg)
// if node is a pod created from ContainerSet template
// then need to fail child nodes so they will not hang in Pending after pod deletion
for _, nodeID := range node.Children {
child := woc.wf.Status.Nodes[nodeID]
if !child.IsExitNode() && !child.Fulfilled() {
woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg)
}
}
}
// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) {
woc.log.Infof("Checking daemoned children of %s", nodeID)
for _, childNode := range woc.wf.Status.Nodes {
if childNode.BoundaryID != nodeID {
continue
}
if !childNode.IsDaemoned() {
continue
}
woc.controller.queuePodForCleanup(woc.wf.Namespace, childNode.ID, terminateContainers)
childNode.Phase = wfv1.NodeSucceeded
childNode.Daemoned = nil
woc.wf.Status.Nodes[childNode.ID] = childNode
woc.updated = true
}
}