Skip to content

Commit

Permalink
Fix Issue tinkerbell#196: Filtered the workflow context sent to worke…
Browse files Browse the repository at this point in the history
…r by server

There is a bit of change in worker as well:
1. Changed Info logs to Debug during sleep
2. Changed a bit of logic of starting the workflow action to execute
  • Loading branch information
parauliya committed Jul 20, 2020
1 parent 7bcc7d6 commit 74e13ea
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
8 changes: 5 additions & 3 deletions cmd/tink-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
if rerr != nil {
exitWithGrpcError(rerr)
}
delete(workflowcontexts, wfID)
return err
}

Expand All @@ -193,6 +194,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {

if len(actions.GetActionList()) == actionIndex+1 {
log.Infoln("Reached to end of workflow")
delete(workflowcontexts, wfID)
turn = false
break
}
Expand All @@ -209,7 +211,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}

func fetchLatestContext(ctx context.Context, client pb.WorkflowSvcClient, workerID string) error {
log.Infof("Fetching latest context for worker %s\n", workerID)
log.Debugf("Fetching latest context for worker %s\n", workerID)
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
Expand Down Expand Up @@ -257,8 +259,8 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action
for r := 1; r <= retries; r++ {
_, err = client.ReportActionStatus(ctx, actionStatus)
if err != nil {
log.Println("Report action status to server failed as : ", err)
log.Printf("Retrying after %v seconds", retryInterval)
log.Errorln("Report action status to server failed as : ", err)
log.Errorf("Retrying after %v seconds", retryInterval)
<-time.After(retryInterval * time.Second)
continue
}
Expand Down
35 changes: 34 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"log"
"time"

"github.com/tinkerbell/tink/db"
Expand Down Expand Up @@ -33,7 +34,9 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest
if err != nil {
return nil, status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
}
wfContexts = append(wfContexts, wfContext)
if isApplicableToSend(context, wfContext, req.WorkerId, sdb) {
wfContexts = append(wfContexts, wfContext)
}
}

return &pb.WorkflowContextList{
Expand Down Expand Up @@ -160,3 +163,33 @@ func GetWorkflowDataVersion(context context.Context, workflowID string, sdb *sql
}
return &pb.GetWorkflowDataResponse{Version: version}, nil
}

// The below function check whether a particular workflow context is applicable or needed to
// be send to a worker based on the state of the current action and the targeted workerID.
func isApplicableToSend(context context.Context, wfContext *pb.WorkflowContext, workerID string, sdb *sql.DB) bool {
if wfContext.GetCurrentActionState() != pb.ActionState_ACTION_FAILED ||
wfContext.GetCurrentActionState() != pb.ActionState_ACTION_TIMEOUT {
actions, err := GetWorkflowActions(context, &pb.WorkflowActionsRequest{WorkflowId: wfContext.GetWorkflowId()}, sdb)
if err != nil {
return false
}
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions) {
log.Println("This workflow is completed ", wfContext.GetWorkflowId())
} else if int(wfContext.GetCurrentActionIndex()) == 0 {
if actions.ActionList[wfContext.GetCurrentActionIndex()].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
} else {
if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
}
}
return false
}

func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool {
return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1
}

0 comments on commit 74e13ea

Please sign in to comment.