Skip to content

Commit

Permalink
BugFix: SubWorkflow and dynamic node handling failure (flyteorg#84)
Browse files Browse the repository at this point in the history
* Handling Finalize for subworkflows

* config change

* improved logging

* Fixing the regression

 Increment was called before calling the abort. Since the logic is
derived using the current attempt number, everything broke

* updated and fixed tests

* comment updated

* updated

* More unit tests
  • Loading branch information
Ketan Umare authored Mar 13, 2020
1 parent bc96237 commit 34c7598
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
configSection = config.MustRegisterSection(configSectionKey, defaultConfig)

defaultConfig = &Config{
MaxWorkflowRetries: 5,
MaxDatasetSizeBytes: 10 * 1024 * 1024,
Queue: CompositeQueueConfig{
Type: CompositeQueueSimple,
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node
case v1alpha1.DynamicNodePhaseFailing:
fallthrough
case v1alpha1.DynamicNodePhaseExecuting:
logger.Infof(ctx, "Aborting dynamic workflow.")
logger.Infof(ctx, "Aborting dynamic workflow at RetryAttempt [%d]", nCtx.CurrentAttempt())
dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx)
if err != nil {
return err
Expand All @@ -183,7 +183,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node

return d.nodeExecutor.AbortHandler(ctx, dynamicWF, dynamicWF.StartNode(), reason)
default:
logger.Infof(ctx, "Aborting regular node.")
logger.Infof(ctx, "Aborting regular node RetryAttempt [%d]", nCtx.CurrentAttempt())
// The parent node has not yet completed, so we will abort the parent node
return d.TaskNodeHandler.Abort(ctx, nCtx, reason)
}
Expand All @@ -195,7 +195,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N

ds := nCtx.NodeStateReader().GetDynamicNodeState()
if ds.Phase == v1alpha1.DynamicNodePhaseFailing || ds.Phase == v1alpha1.DynamicNodePhaseExecuting {
logger.Infof(ctx, "Finalizing dynamic workflow")
logger.Infof(ctx, "Finalizing dynamic workflow RetryAttempt [%d]", nCtx.CurrentAttempt())
dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx)
if err != nil {
errs = append(errs, err)
Expand All @@ -212,7 +212,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N
// We should always finalize the parent node success or failure.
// If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing
// (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent.
logger.Infof(ctx, "Finalizing Parent node")
logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt())
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
errs = append(errs, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
sr := &nodeMocks.NodeStateReader{}
sr.OnGetDynamicNodeState().Return(s)
nCtx.OnNodeStateReader().Return(sr)
nCtx.OnCurrentAttempt().Return(0)

h := &mocks.TaskNodeHandler{}
h.OnFinalize(ctx, nCtx).Return(nil)
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
return executors.NodeStatusUndefined, err
}

// NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state
// Attempt is used throughout the system to determine the idempotent resource version.
nodeStatus.IncrementAttempts()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying")
// We are going to retry in the next round, so we should clear all current state
nodeStatus.ClearSubNodeStatus()
Expand All @@ -399,7 +402,6 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
}

if p.GetPhase() == handler.EPhaseRetryableFailure {
nodeStatus.IncrementAttempts()
if p.GetErr() != nil && p.GetErr().GetKind() == core.ExecutionError_SYSTEM {
nodeStatus.IncrementSystemFailures()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
},
false, true, core.NodeExecution_FAILED, 0},

{"(retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) {
{"retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.UnknownTransition, fmt.Errorf("should not be invoked")
}, false, false, core.NodeExecution_RUNNING, 0},
}, false, false, core.NodeExecution_RUNNING, 1},

{"running->failing", v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("code", "reason", nil)), nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/nodes/subworkflow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (w *workflowNodeHandler) FinalizeRequired() bool {
return false
}

func (w *workflowNodeHandler) Setup(ctx context.Context, setupContext handler.SetupContext) error {
func (w *workflowNodeHandler) Setup(_ context.Context, _ handler.SetupContext) error {
return nil
}

Expand Down Expand Up @@ -94,17 +94,17 @@ func (w *workflowNodeHandler) Abort(ctx context.Context, nCtx handler.NodeExecut
wf := nCtx.Workflow()
wfNode := nCtx.Node().GetWorkflowNode()
if wfNode.GetSubWorkflowRef() != nil {
return w.subWfHandler.HandleAbort(ctx, nCtx, wf, *wfNode.GetSubWorkflowRef())
return w.subWfHandler.HandleAbort(ctx, nCtx, wf, *wfNode.GetSubWorkflowRef(), reason)
}

if wfNode.GetLaunchPlanRefID() != nil {
return w.lpHandler.HandleAbort(ctx, wf, nCtx.Node())
return w.lpHandler.HandleAbort(ctx, wf, nCtx.Node(), reason)
}
return nil
}

func (w *workflowNodeHandler) Finalize(ctx context.Context, executionContext handler.NodeExecutionContext) error {
logger.Debugf(ctx, "WorkflowNode::Finalizer: nothing to do")
func (w *workflowNodeHandler) Finalize(ctx context.Context, _ handler.NodeExecutionContext) error {
logger.Warnf(ctx, "Subworkflow finalize invoked. Nothing to be done")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil
}

func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode) error {
func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, reason string) error {
nodeStatus := w.GetNodeExecutionStatus(ctx, node.GetID())
childID, err := GetChildWorkflowExecutionID(
w.GetExecutionID().WorkflowExecutionIdentifier,
Expand All @@ -160,5 +160,5 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.Executab
// THIS SHOULD NEVER HAPPEN
return err
}
return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("parent execution id [%s] aborted", w.GetName()))
return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("parent execution id [%s] aborted, reason [%s]", w.GetName(), reason))
}
15 changes: 7 additions & 8 deletions pkg/controller/nodes/subworkflow/launchplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
mocks2 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks"
"github.com/lyft/flytepropeller/pkg/utils"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func createInmemoryStore(t testing.TB) *storage.DataStore {
Expand Down Expand Up @@ -614,7 +615,6 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {

t.Run("abort-success", func(t *testing.T) {
mockLPExec := &mocks.Executor{}
//mockStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, storage.NewDefaultProtobufStore(utils.FailingRawStore{}, promutils.NewTestScope()))
mockLPExec.On("Kill",
ctx,
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
Expand All @@ -626,14 +626,13 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {
h := launchPlanHandler{
launchPlan: mockLPExec,
}
err := h.HandleAbort(ctx, mockWf, mockNode)
err := h.HandleAbort(ctx, mockWf, mockNode, "some reason")
assert.NoError(t, err)
})

t.Run("abort-fail", func(t *testing.T) {
expectedErr := fmt.Errorf("fail")
mockLPExec := &mocks.Executor{}
// mockStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, storage.NewDefaultProtobufStore(utils.FailingRawStore{}, promutils.NewTestScope()))
mockLPExec.On("Kill",
ctx,
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
Expand All @@ -645,7 +644,7 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {
h := launchPlanHandler{
launchPlan: mockLPExec,
}
err := h.HandleAbort(ctx, mockWf, mockNode)
err := h.HandleAbort(ctx, mockWf, mockNode, "reason")
assert.Error(t, err)
assert.Equal(t, err, expectedErr)
})
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"fmt"

"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytestdlib/storage"
)

// TODO Add unit tests for subworkflow handler
Expand Down Expand Up @@ -174,7 +175,7 @@ func (s *subworkflowHandler) HandleSubWorkflowFailingNode(ctx context.Context, n
return s.DoInFailureHandling(ctx, nCtx, contextualSubWorkflow)
}

func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, w v1alpha1.ExecutableWorkflow, workflowID v1alpha1.WorkflowID) error {
func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, w v1alpha1.ExecutableWorkflow, workflowID v1alpha1.WorkflowID, reason string) error {
subWorkflow := w.FindSubWorkflow(workflowID)
if subWorkflow == nil {
return fmt.Errorf("no sub workflow [%s] found in node [%s]", workflowID, nCtx.NodeID())
Expand All @@ -183,12 +184,12 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE
nodeStatus := w.GetNodeExecutionStatus(ctx, nCtx.NodeID())
contextualSubWorkflow := executors.NewSubContextualWorkflow(w, subWorkflow, nodeStatus)

startNode := w.StartNode()
startNode := contextualSubWorkflow.StartNode()
if startNode == nil {
return fmt.Errorf("no sub workflow [%s] found in node [%s]", workflowID, nCtx.NodeID())
}

return s.nodeExecutor.AbortHandler(ctx, contextualSubWorkflow, startNode, "")
return s.nodeExecutor.AbortHandler(ctx, contextualSubWorkflow, startNode, reason)
}

func newSubworkflowHandler(nodeExecutor executors.Node) subworkflowHandler {
Expand Down
78 changes: 78 additions & 0 deletions pkg/controller/nodes/subworkflow/subworkflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package subworkflow

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
coreMocks "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
execMocks "github.com/lyft/flytepropeller/pkg/controller/executors/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler/mocks"
)

func Test_subworkflowHandler_HandleAbort(t *testing.T) {
ctx := context.TODO()

t.Run("missing-subworkflow", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
wf.OnFindSubWorkflow("x").Return(nil)
nCtx.OnNodeID().Return("n1")
assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})

t.Run("missing-startNode", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
st := &coreMocks.ExecutableNodeStatus{}
swf := &coreMocks.ExecutableSubWorkflow{}
wf.OnFindSubWorkflow("x").Return(swf)
wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st)
nCtx.OnNodeID().Return("n1")
swf.OnStartNode().Return(nil)
assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})

t.Run("abort-error", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
st := &coreMocks.ExecutableNodeStatus{}
swf := &coreMocks.ExecutableSubWorkflow{}
wf.OnFindSubWorkflow("x").Return(swf)
wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st)
nCtx.OnNodeID().Return("n1")
n := &coreMocks.ExecutableNode{}
swf.OnStartNode().Return(n)
nodeExec.OnAbortHandler(ctx, wf, n, "reason").Return(fmt.Errorf("err"))
assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})

t.Run("abort-success", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
st := &coreMocks.ExecutableNodeStatus{}
swf := &coreMocks.ExecutableSubWorkflow{}
wf.OnFindSubWorkflow("x").Return(swf)
wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st)
nCtx.OnNodeID().Return("n1")
n := &coreMocks.ExecutableNode{}
swf.OnStartNode().Return(n)
swf.OnGetID().Return("swf")
nodeExec.OnAbortHandlerMatch(mock.Anything, mock.MatchedBy(func(wf v1alpha1.ExecutableWorkflow) bool {
return wf.GetID() == swf.GetID()
}), n, mock.Anything).Return(nil)
assert.NoError(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})
}

0 comments on commit 34c7598

Please sign in to comment.