From c6177062276cc39c3b21644ab1d6989cbcaf075c Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 26 Apr 2022 13:04:10 -0700 Subject: [PATCH] fix: Pod `OOMKilled` should fail workflow. Fixes #8456 (#8478) Signed-off-by: Alex Collins --- workflow/controller/controller.go | 2 +- workflow/controller/operator.go | 21 ++++++++++++++------- workflow/controller/operator_test.go | 2 +- workflow/executor/executor.go | 20 +++++++++++--------- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index ca31cb7caffe..756d5b692a13 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -549,7 +549,7 @@ func (wfc *WorkflowController) signalContainers(namespace string, podName string } for _, c := range pod.Status.ContainerStatuses { - if c.Name == common.WaitContainerName || c.State.Terminated != nil { + if c.State.Terminated != nil { continue } if err := signal.SignalContainer(wfc.restConfig, pod, c.Name, sig); err != nil { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 094a83793bf8..d814be7074fa 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1261,6 +1261,15 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus new.Outputs.ExitCode = pointer.StringPtr(fmt.Sprint(*exitCode)) } + // We cannot fail the node until the wait container is finished because it may be busy saving outputs, and these + // would not get captured successfully. + for _, c := range pod.Status.ContainerStatuses { + if c.Name == common.WaitContainerName && c.State.Terminated == nil && new.Phase.Completed() { + woc.log.WithField("new.phase", new.Phase).Info("leaving phase un-changed: wait container is not yet terminated ") + new.Phase = old.Phase + } + } + // if we are transitioning from Pending to a different state, clear out unchanged message if old.Phase == wfv1.NodePending && new.Phase != wfv1.NodePending && old.Message == new.Message { new.Message = "" @@ -1272,7 +1281,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus } if !reflect.DeepEqual(old, new) { - log.WithField("nodeID", old.ID). + woc.log.WithField("nodeID", old.ID). WithField("old.phase", old.Phase). WithField("new.phase", new.Phase). WithField("old.message", old.Message). @@ -1282,7 +1291,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus Info("node changed") return new } - log.WithField("nodeID", old.ID). + woc.log.WithField("nodeID", old.ID). Info("node unchanged") return nil } @@ -1298,11 +1307,9 @@ func getExitCode(pod *apiv1.Pod) *int32 { func podHasContainerNeedingTermination(pod *apiv1.Pod, tmpl wfv1.Template) bool { for _, c := range pod.Status.ContainerStatuses { - // Only clean up pod when both the wait and the main containers are terminated - if c.Name == common.WaitContainerName || tmpl.IsMainContainerName(c.Name) { - if c.State.Terminated == nil { - return false - } + // Only clean up pod when all main containers are terminated + if tmpl.IsMainContainerName(c.Name) && c.State.Terminated == nil { + return false } } return true diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 591cc031dbef..38a670174ec8 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -6320,7 +6320,7 @@ func TestPodHasContainerNeedingTermination(t *testing.T) { State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}}, }, }}} - assert.False(t, podHasContainerNeedingTermination(&pod, tmpl)) + assert.True(t, podHasContainerNeedingTermination(&pod, tmpl)) pod = apiv1.Pod{ Status: apiv1.PodStatus{ diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 04823f201c4f..b99da4394622 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -998,15 +998,21 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error { log.WithField("annotationPatchTickDuration", we.annotationPatchTickDuration).WithField("readProgressFileTickDuration", we.readProgressFileTickDuration).Info("monitoring progress disabled") } + // this allows us to gracefully shutdown, capturing artifacts + ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM) + defer cancel() + go we.monitorDeadline(ctx, containerNames) - err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) { - err := we.RuntimeExecutor.Wait(ctx, containerNames) - return err == nil, err + + err := retryutil.OnError(executorretry.ExecutorRetry, errorsutil.IsTransientErr, func() error { + return we.RuntimeExecutor.Wait(ctx, containerNames) }) - if err != nil { + + log.WithError(err).Info("Main container completed") + + if err != nil && err != context.Canceled { return fmt.Errorf("failed to wait for main container to complete: %w", err) } - log.Infof("Main container completed") return nil } @@ -1065,8 +1071,6 @@ func (we *WorkflowExecutor) monitorProgress(ctx context.Context, progressFile st // monitorDeadline checks to see if we exceeded the deadline for the step and // terminates the main container if we did func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames []string) { - terminate := make(chan os.Signal, 1) - signal.Notify(terminate, syscall.SIGTERM) deadlineExceeded := make(chan bool, 1) if !we.Deadline.IsZero() { @@ -1084,8 +1088,6 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames return case <-deadlineExceeded: message = "Step exceeded its deadline" - case <-terminate: - message = "Step terminated" } log.Info(message) util.WriteTerminateMessage(message)