Skip to content

Commit

Permalink
Fix Issue tinkerbell#196: Filtered the workflow context sent to worke…
Browse files Browse the repository at this point in the history
…r by server

There is a bit of change in worker as well:
1. Changed Info logs to Debug during sleep
2. Changed a bit of logic of starting the workflow action to execute
  • Loading branch information
parauliya committed Jul 13, 2020
1 parent a68bec0 commit 017c925
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
20 changes: 7 additions & 13 deletions cmd/tink-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,15 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
turn = true
}
} else {
switch wfContext.GetCurrentActionState() {
case pb.ActionState_ACTION_SUCCESS:
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS {
if isLastAction(wfContext, actions) {
log.Infof("Workflow %s completed successfully\n", wfID)
continue
}
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()+1]
actionIndex = int(wfContext.GetCurrentActionIndex()) + 1
case pb.ActionState_ACTION_FAILED:
log.Infof("Workflow %s Failed\n", wfID)
continue
case pb.ActionState_ACTION_TIMEOUT:
log.Infof("Workflow %s Timeout\n", wfID)
continue
default:
log.Infof("Current context %s\n", wfContext)
} else {
log.Debugf("Current context %s\n", wfContext)
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()]
actionIndex = int(wfContext.GetCurrentActionIndex())
}
Expand Down Expand Up @@ -209,7 +202,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}

func fetchLatestContext(ctx context.Context, client pb.WorkflowSvcClient, workerID string) error {
log.Infof("Fetching latest context for worker %s\n", workerID)
log.Debugf("Fetching latest context for worker %s\n", workerID)
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
Expand Down Expand Up @@ -257,11 +250,12 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action
for r := 1; r <= retries; r++ {
_, err = client.ReportActionStatus(ctx, actionStatus)
if err != nil {
log.Println("Report action status to server failed as : ", err)
log.Printf("Retrying after %v seconds", retryInterval)
log.Debugf("Report action status to server failed as : ", err)
log.Debugf("Retrying after %v seconds", retryInterval)
<-time.After(retryInterval * time.Second)
continue
}
log.Info("Action status reported to provisioner ", actionStatus.GetActionName())
return nil
}
return err
Expand Down
20 changes: 19 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"log"
"time"

"github.com/tinkerbell/tink/db"
Expand Down Expand Up @@ -33,7 +34,20 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest
if err != nil {
return nil, status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
}
wfContexts = append(wfContexts, wfContext)
if wfContext.GetCurrentActionState() != pb.ActionState_ACTION_FAILED ||
wfContext.GetCurrentActionState() != pb.ActionState_ACTION_TIMEOUT {
actions, err := GetWorkflowActions(context, &pb.WorkflowActionsRequest{WorkflowId: wfContext.GetWorkflowId()}, sdb)
if err != nil {
return &pb.WorkflowContextList{WorkflowContexts: wfContexts}, err
}
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions) {
log.Println("This workflow is completed ", wfContext.GetWorkflowId())
} else {
if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == req.WorkerId {
wfContexts = append(wfContexts, wfContext)
}
}
}
}

return &pb.WorkflowContextList{
Expand Down Expand Up @@ -160,3 +174,7 @@ func GetWorkflowDataVersion(context context.Context, workflowID string, sdb *sql
}
return &pb.GetWorkflowDataResponse{Version: version}, nil
}

func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool {
return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1
}

0 comments on commit 017c925

Please sign in to comment.