diff --git a/cmd/tink-worker/worker.go b/cmd/tink-worker/worker.go index d249ca32e..3dc76d3e0 100644 --- a/cmd/tink-worker/worker.go +++ b/cmd/tink-worker/worker.go @@ -176,6 +176,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { if rerr != nil { exitWithGrpcError(rerr) } + delete(workflowcontexts, wfID) return err } @@ -193,6 +194,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { if len(actions.GetActionList()) == actionIndex+1 { log.Infoln("Reached to end of workflow") + delete(workflowcontexts, wfID) turn = false break } @@ -209,7 +211,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } func fetchLatestContext(ctx context.Context, client pb.WorkflowSvcClient, workerID string) error { - log.Infof("Fetching latest context for worker %s\n", workerID) + log.Debugf("Fetching latest context for worker %s\n", workerID) res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) if err != nil { return err @@ -257,8 +259,8 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action for r := 1; r <= retries; r++ { _, err = client.ReportActionStatus(ctx, actionStatus) if err != nil { - log.Println("Report action status to server failed as : ", err) - log.Printf("Retrying after %v seconds", retryInterval) + log.Errorln("Report action status to server failed as : ", err) + log.Errorf("Retrying after %v seconds", retryInterval) <-time.After(retryInterval * time.Second) continue } diff --git a/executor/executor.go b/executor/executor.go index 40a269259..3f5010569 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "log" "time" "github.com/tinkerbell/tink/db" @@ -33,7 +34,9 @@ func GetWorkflowContexts(context context.Context, req *pb.WorkflowContextRequest if err != nil { return nil, status.Errorf(codes.Aborted, "invalid workflow %s found for worker %s", wf, req.WorkerId) } - wfContexts = append(wfContexts, wfContext) + if isApplicableToSend(context, wfContext, req.WorkerId, sdb) { + wfContexts = append(wfContexts, wfContext) + } } return &pb.WorkflowContextList{ @@ -160,3 +163,33 @@ func GetWorkflowDataVersion(context context.Context, workflowID string, sdb *sql } return &pb.GetWorkflowDataResponse{Version: version}, nil } + +// The below function check whether a particular workflow context is applicable or needed to +// be send to a worker based on the state of the current action and the targeted workerID. +func isApplicableToSend(context context.Context, wfContext *pb.WorkflowContext, workerID string, sdb *sql.DB) bool { + if wfContext.GetCurrentActionState() != pb.ActionState_ACTION_FAILED || + wfContext.GetCurrentActionState() != pb.ActionState_ACTION_TIMEOUT { + actions, err := GetWorkflowActions(context, &pb.WorkflowActionsRequest{WorkflowId: wfContext.GetWorkflowId()}, sdb) + if err != nil { + return false + } + if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_SUCCESS && isLastAction(wfContext, actions) { + log.Println("This workflow is completed ", wfContext.GetWorkflowId()) + } else if int(wfContext.GetCurrentActionIndex()) == 0 { + if actions.ActionList[wfContext.GetCurrentActionIndex()].GetWorkerId() == workerID { + log.Println("Send the workflow context ", wfContext.GetWorkflowId()) + return true + } + } else { + if actions.ActionList[wfContext.GetCurrentActionIndex()+1].GetWorkerId() == workerID { + log.Println("Send the workflow context ", wfContext.GetWorkflowId()) + return true + } + } + } + return false +} + +func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool { + return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1 +}