From 82d3750dbe3f52b349bf70fc0ff1de6746c68d10 Mon Sep 17 00:00:00 2001 From: "toyamagu2021@gmail.com" Date: Sun, 4 Jun 2023 21:13:06 +0900 Subject: [PATCH 1/2] fix: check hooked nodes in executeWfLifeCycleHook. Signed-off-by: toyamagu2021@gmail.com --- test/e2e/hooks_test.go | 156 +++++++++++++++++++++++++++- workflow/controller/hooks.go | 10 +- workflow/controller/hooks_test.go | 167 ++++++++++++++++++++++++++++++ 3 files changed, 330 insertions(+), 3 deletions(-) diff --git a/test/e2e/hooks_test.go b/test/e2e/hooks_test.go index 3e4480f837d2..8bd599fd3309 100644 --- a/test/e2e/hooks_test.go +++ b/test/e2e/hooks_test.go @@ -329,6 +329,160 @@ spec: }) } +func (s *HooksSuite) TestWorkflowLevelHooksWaitTrrigedHook() { + s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: lifecycle-hook- +spec: + entrypoint: main + hooks: + running: + expression: workflow.status == "Running" + template: sleep + # This hook never triggered by following test. + # To guarantee workflow does not wait forever for untriggered hooks. + failed: + expression: workflow.status == "Failed" + template: sleep + templates: + - name: main + steps: + - - name: step1 + template: exit0 + + - name: exit0 + container: + image: alpine:latest + command: ["/bin/sh", "-c"] + args: ["exit 0"] + - name: sleep + container: + image: alpine:latest + command: ["/bin/sh", "-c"] + args: ["/bin/sleep 2; exit 0"] +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeSucceeded). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded) + assert.Equal(t, status.Progress, v1alpha1.Progress("2/2")) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, ".hooks.running") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + }) +} + +func (s *HooksSuite) TestTemplateLevelHooksWaitTrrigedHook() { + s.Given(). + Workflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: example-steps +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: job + template: exit0 + hooks: + running: + expression: steps['job'].status == "Running" + template: hook + failed: + expression: steps['job'].status == "Failed" + template: hook + - name: hook + script: + image: alpine:latest + command: [/bin/sh] + source: | + sleep 2 + - name: exit0 + script: + image: alpine:latest + command: [/bin/sh] + source: | + exit 0 +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeSucceeded). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded) + assert.Equal(t, status.Progress, v1alpha1.Progress("2/2")) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "job.hooks.running") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + }) +} + +// Ref: https://github.com/argoproj/argo-workflows/issues/11117 +func (s *HooksSuite) TestTemplateLevelHooksWaitTrrigedHookAndRespectSynchronization() { + s.Given(). + Workflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: example-steps-simple-mutex +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: job + template: exit0 + hooks: + running: + expression: steps['job'].status == "Running" + template: sleep + succeed: + expression: steps['job'].status == "Succeeded" + template: sleep + - name: sleep + synchronization: + mutex: + name: job + script: + image: alpine:latest + command: [/bin/sh] + source: | + sleep 3 + - name: exit0 + script: + image: alpine:latest + command: [/bin/sh] + source: | + sleep 1 + exit 0 +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeSucceeded). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded) + assert.Equal(t, status.Progress, v1alpha1.Progress("3/3")) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "job.hooks.running") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "job.hooks.succeed") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + }) +} + func TestHooksSuite(t *testing.T) { suite.Run(t, new(HooksSuite)) -} \ No newline at end of file +} diff --git a/workflow/controller/hooks.go b/workflow/controller/hooks.go index e8c9276e3fb3..25ad672423e2 100644 --- a/workflow/controller/hooks.go +++ b/workflow/controller/hooks.go @@ -20,6 +20,8 @@ func (woc *wfOperationCtx) executeWfLifeCycleHook(ctx context.Context, tmplCtx * continue } hookNodeName := generateLifeHookNodeName(woc.wf.ObjectMeta.Name, string(hookName)) + // To check a node was triggered. + hookedNode := woc.wf.GetNodeByName(hookNodeName) if hook.Expression == "" { return true, errors.Errorf(errors.CodeBadRequest, "Expression required for hook %s", hookNodeName) } @@ -27,7 +29,8 @@ func (woc *wfOperationCtx) executeWfLifeCycleHook(ctx context.Context, tmplCtx * if err != nil { return true, err } - if execute { + // executeTemplated should be invoked when hookedNode != nil, because we should reexecute the function to check mutex condition, etc. + if execute || hookedNode != nil { woc.log.WithField("lifeCycleHook", hookName).WithField("node", hookNodeName).Infof("Running workflow level hooks") hookNode, err := woc.executeTemplate(ctx, hookNodeName, &wfv1.WorkflowStep{Template: hook.Template, TemplateRef: hook.TemplateRef}, tmplCtx, hook.Arguments, &executeTemplateOpts{}) if err != nil { @@ -58,6 +61,8 @@ func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope * continue } hookNodeName := generateLifeHookNodeName(parentNode.Name, string(hookName)) + // To check a node was triggered + hookedNode := woc.wf.GetNodeByName(hookNodeName) if hook.Expression == "" { return false, errors.Errorf(errors.CodeBadRequest, "Expression required for hook %s", hookNodeName) } @@ -65,7 +70,8 @@ func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope * if err != nil { return false, err } - if execute { + // executeTemplated should be invoked when hookedNode != nil, because we should reexecute the function to check mutex condition, etc. + if execute || hookedNode != nil { outputs := parentNode.Outputs if parentNode.Type == wfv1.NodeTypeRetry { lastChildNode := getChildNodeIndex(parentNode, woc.wf.Status.Nodes, -1) diff --git a/workflow/controller/hooks_test.go b/workflow/controller/hooks_test.go index 75948d246a71..0fb015f80ff9 100644 --- a/workflow/controller/hooks_test.go +++ b/workflow/controller/hooks_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/workflow/common" @@ -146,6 +147,7 @@ status: assert.NotNil(t, node) node = woc.wf.Status.Nodes.FindByDisplayName("lifecycle-hook-bgsf6.hooks.running") assert.Nil(t, node) + assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase) } func TestExecuteTmplLifeCycleHook(t *testing.T) { @@ -1069,3 +1071,168 @@ spec: assert.Equal(t, wfv1.WorkflowFailed, woc.wf.Status.Phase) assert.Equal(t, "invalid spec: templates.main.steps[0].step-1.foo Expression required", woc.wf.Status.Message) } + +func TestWfHookWfWaitForTriggerdHook(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hook-running + namespace: argo +spec: + entrypoint: main + hooks: + running: + expression: workflow.status == "Running" + template: sleep + # This hook never triggered by following test. + # To guarantee workflow does not wait forever for untriggered hooks. + failure: + expression: workflow.status == "Failed" + template: sleep + templates: + - name: main + container: + image: alpine:latest + command: [sh, -c] + args: ["echo", "This template finish fastest"] + - name: sleep + script: + image: alpine:latest + command: [sh] + source: | + sleep 10 +`) + + // Setup + cancel, controller := newController(wf) + defer cancel() + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodRunning) + + // Check if running hook is triggered + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + node := woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running") + assert.NotNil(t, node) + assert.Equal(t, wfv1.NodePending, node.Phase) + + // Make all pods running + makePodsPhase(ctx, woc, apiv1.PodRunning) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running") + assert.Equal(t, wfv1.NodeRunning, node.Phase) + + // Make main pod completed + podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()) + pod, _ := podcs.Get(ctx, "hook-running", metav1.GetOptions{}) + pod.Status.Phase = apiv1.PodSucceeded + updatedPod, _ := podcs.Update(ctx, pod, metav1.UpdateOptions{}) + _ = woc.controller.podInformer.GetStore().Update(updatedPod) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress) + node = woc.wf.Status.Nodes.FindByDisplayName("hook-running") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running") + assert.Equal(t, wfv1.NodeRunning, node.Phase) + assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) + + // Make all pod completed + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + assert.Equal(t, wfv1.Progress("2/2"), woc.wf.Status.Progress) + node = woc.wf.Status.Nodes.FindByDisplayName("hook-running.hooks.running") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + node = woc.wf.Status.Nodes.FindByDisplayName("hook-running") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) +} + +func TestWfTemplHookWfWaitForTriggerdHook(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hook-running +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: job + template: exit0 + hooks: + running: + expression: steps['job'].status == "Running" + template: hook + failed: + expression: steps['job'].status == "Failed" + template: hook + - name: hook + script: + image: alpine:latest + command: [/bin/sh] + source: | + sleep 5 + - name: exit0 + script: + image: alpine:latest + command: [/bin/sh] + source: | + exit 0 +`) + + // Setup + cancel, controller := newController(wf) + defer cancel() + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + makePodsPhase(ctx, woc, apiv1.PodRunning) + + // Check if running hook is triggered + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + node := woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running") + assert.NotNil(t, node) + assert.Equal(t, wfv1.NodePending, node.Phase) + + // Make all pods running + makePodsPhase(ctx, woc, apiv1.PodRunning) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running") + assert.Equal(t, wfv1.NodeRunning, node.Phase) + + // Make main pod completed + podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()) + pods, _ := podcs.List(ctx, metav1.ListOptions{}) + pod := pods.Items[0] + pod.Status.Phase = apiv1.PodSucceeded + updatedPod, _ := podcs.Update(ctx, &pod, metav1.UpdateOptions{}) + _ = woc.controller.podInformer.GetStore().Update(updatedPod) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress) + node = woc.wf.Status.Nodes.FindByDisplayName("job") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running") + assert.Equal(t, wfv1.NodeRunning, node.Phase) + assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) + + // Make all pod completed + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + assert.Equal(t, wfv1.Progress("2/2"), woc.wf.Status.Progress) + node = woc.wf.Status.Nodes.FindByDisplayName("job.hooks.running") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + node = woc.wf.Status.Nodes.FindByDisplayName("job") + assert.Equal(t, wfv1.NodeSucceeded, node.Phase) + assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) +} From e4334582e927bf6b2206fb70758da4acf606ab92 Mon Sep 17 00:00:00 2001 From: "toyamagu2021@gmail.com" Date: Sun, 4 Jun 2023 22:23:20 +0900 Subject: [PATCH 2/2] fix: typo Signed-off-by: toyamagu2021@gmail.com --- test/e2e/hooks_test.go | 6 +++--- workflow/controller/hooks_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/e2e/hooks_test.go b/test/e2e/hooks_test.go index 8bd599fd3309..f6501bd63eb5 100644 --- a/test/e2e/hooks_test.go +++ b/test/e2e/hooks_test.go @@ -329,7 +329,7 @@ spec: }) } -func (s *HooksSuite) TestWorkflowLevelHooksWaitTrrigedHook() { +func (s *HooksSuite) TestWorkflowLevelHooksWaitForTriggeredHook() { s.Given(). Workflow(`apiVersion: argoproj.io/v1alpha1 kind: Workflow @@ -377,7 +377,7 @@ spec: }) } -func (s *HooksSuite) TestTemplateLevelHooksWaitTrrigedHook() { +func (s *HooksSuite) TestTemplateLevelHooksWaitForTriggeredHook() { s.Given(). Workflow(` apiVersion: argoproj.io/v1alpha1 @@ -426,7 +426,7 @@ spec: } // Ref: https://github.com/argoproj/argo-workflows/issues/11117 -func (s *HooksSuite) TestTemplateLevelHooksWaitTrrigedHookAndRespectSynchronization() { +func (s *HooksSuite) TestTemplateLevelHooksWaitForTriggeredHookAndRespectSynchronization() { s.Given(). Workflow(` apiVersion: argoproj.io/v1alpha1 diff --git a/workflow/controller/hooks_test.go b/workflow/controller/hooks_test.go index 0fb015f80ff9..c67027f3a24f 100644 --- a/workflow/controller/hooks_test.go +++ b/workflow/controller/hooks_test.go @@ -1072,7 +1072,7 @@ spec: assert.Equal(t, "invalid spec: templates.main.steps[0].step-1.foo Expression required", woc.wf.Status.Message) } -func TestWfHookWfWaitForTriggerdHook(t *testing.T) { +func TestWfHookWfWaitForTriggeredHook(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(` apiVersion: argoproj.io/v1alpha1 kind: Workflow @@ -1153,7 +1153,7 @@ spec: assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) } -func TestWfTemplHookWfWaitForTriggerdHook(t *testing.T) { +func TestWfTemplHookWfWaitForTriggeredHook(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(` apiVersion: argoproj.io/v1alpha1 kind: Workflow