Skip to content

Commit

Permalink
Fix Issue #196: Filtered the workflow context sent to worker by server
Browse files Browse the repository at this point in the history
There is a bit of change in worker as well:
1. Changed Info logs to Debug during sleep
2. Changed a bit of logic of starting the workflow action to execute
  • Loading branch information
parauliya committed Jul 17, 2020
1 parent 0100e53 commit 207bfbf
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
15 changes: 8 additions & 7 deletions cmd/tink-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {

for wfID, wfContext := range workflowcontexts {
actions, ok := workflowactions[wfID]
delete(workflowcontexts, wfID)
if !ok {
return fmt.Errorf("can't find actions for workflow %s", wfID)
}
Expand All @@ -80,16 +81,16 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
switch wfContext.GetCurrentActionState() {
case pb.ActionState_ACTION_SUCCESS:
if isLastAction(wfContext, actions) {
log.Infof("Workflow %s completed successfully\n", wfID)
log.Debugf("Workflow %s completed successfully\n", wfID)
continue
}
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()+1]
actionIndex = int(wfContext.GetCurrentActionIndex()) + 1
case pb.ActionState_ACTION_FAILED:
log.Infof("Workflow %s Failed\n", wfID)
log.Debugf("Workflow %s Failed\n", wfID)
continue
case pb.ActionState_ACTION_TIMEOUT:
log.Infof("Workflow %s Timeout\n", wfID)
log.Debugf("Workflow %s Timeout\n", wfID)
continue
default:
log.Infof("Current context %s\n", wfContext)
Expand Down Expand Up @@ -192,7 +193,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
updateWorkflowData(ctx, client, actionStatus)

if len(actions.GetActionList()) == actionIndex+1 {
log.Infoln("Reached to end of workflow")
log.Debugln("Reached to end of workflow")
turn = false
break
}
Expand All @@ -209,7 +210,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error {
}

func fetchLatestContext(ctx context.Context, client pb.WorkflowSvcClient, workerID string) error {
log.Infof("Fetching latest context for worker %s\n", workerID)
log.Debugf("Fetching latest context for worker %s\n", workerID)
res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID})
if err != nil {
return err
Expand Down Expand Up @@ -257,8 +258,8 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action
for r := 1; r <= retries; r++ {
_, err = client.ReportActionStatus(ctx, actionStatus)
if err != nil {
log.Println("Report action status to server failed as : ", err)
log.Printf("Retrying after %v seconds", retryInterval)
log.Errorln("Report action status to server failed as : ", err)
log.Errorf("Retrying after %v seconds", retryInterval)
<-time.After(retryInterval * time.Second)
continue
}
Expand Down
33 changes: 32 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"log"
"time"

"github.com/tinkerbell/tink/db"
Expand Down Expand Up @@ -33,7 +34,9 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest
if err != nil {
return nil, status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId)
}
wfContexts = append(wfContexts, wfContext)
if isApplicableToSend(context, wfContext, req.WorkerId, sdb) {
wfContexts = append(wfContexts, wfContext)
}
}

return &pb.WorkflowContextList{
Expand Down Expand Up @@ -160,3 +163,31 @@ func GetWorkflowDataVersion(context context.Context, workflowID string, sdb *sql
}
return &pb.GetWorkflowDataResponse{Version: version}, nil
}

func isApplicableToSend(context context.Context, wfContext *pb.WorkflowContext, workerID string, sdb *sql.DB) bool {
if wfContext.GetCurrentActionState() != pb.ActionState_ACTION_FAILED ||
wfContext.GetCurrentActionState() != pb.ActionState_ACTION_TIMEOUT {
actions, err := GetWorkflowActions(context, &pb.WorkflowActionsRequest{WorkflowId: wfContext.GetWorkflowId()}, sdb)
if err != nil {
return false
}
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions) {
log.Println("This workflow is completed ", wfContext.GetWorkflowId())
} else if int(wfContext.GetCurrentActionIndex()) == 0 {
if actions.ActionList[wfContext.GetCurrentActionIndex()].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
} else {
if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == workerID {
log.Println("Send the workflow context ", wfContext.GetWorkflowId())
return true
}
}
}
return false
}

func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool {
return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1
}

0 comments on commit 207bfbf

Please sign in to comment.