From 15f4e6a606c6b403df32a74e24831ed90ddceaa7 Mon Sep 17 00:00:00 2001 From: Jacob Weinstock Date: Thu, 22 Dec 2022 17:51:49 -0700 Subject: [PATCH 1/4] Clean up tink worker; remove os.Exit; ensure valid container names: Removing numerous `os.Exit` calls means that tink worker wont require a restart for action execution failures. This commit also allows action names to contain spaces without the action failing to run. Signed-off-by: Jacob Weinstock --- cmd/tink-worker/cmd/root.go | 1 - cmd/tink-worker/worker/container_manager.go | 12 ++- cmd/tink-worker/worker/worker.go | 82 ++++++++------------- 3 files changed, 41 insertions(+), 54 deletions(-) diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go index eb055b0e3..3c8abdbc1 100644 --- a/cmd/tink-worker/cmd/root.go +++ b/cmd/tink-worker/cmd/root.go @@ -130,7 +130,6 @@ func initViper(logger log.Logger, cmd *cobra.Command) error { logger.With("configFile", viper.ConfigFileUsed()).Error(err, "could not load config file") return err } - logger.Info("no config file found") } else { logger.With("configFile", viper.ConfigFileUsed()).Info("loaded config file") } diff --git a/cmd/tink-worker/worker/container_manager.go b/cmd/tink-worker/worker/container_manager.go index 6dd70e8e6..8f25ab102 100644 --- a/cmd/tink-worker/worker/container_manager.go +++ b/cmd/tink-worker/worker/container_manager.go @@ -4,6 +4,7 @@ import ( "context" "path" "path/filepath" + "regexp" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -76,13 +77,22 @@ func (m *containerManager) CreateContainer(ctx context.Context, cmd []string, wf hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) l.With("command", cmd).Info("creating container") - resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, action.GetName()) + name := makeValidContainerName(action.GetName()) + resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, name) if err != nil { return "", errors.Wrap(err, "DOCKER CREATE") } return resp.ID, nil } +// makeValidContainerName returns a valid container name for docker. +// only [a-zA-Z0-9][a-zA-Z0-9_.-] are allowed. +func makeValidContainerName(name string) string { + regex := regexp.MustCompile(`[^a-zA-Z0-9_.-]`) + result := "action_" // so we don't need to regex on the first character different from the rest. + return result + regex.ReplaceAllString(name, "_") +} + func (m *containerManager) StartContainer(ctx context.Context, id string) error { m.getLogger(ctx).With("containerID", id).Debug("starting container") return errors.Wrap(m.cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START") diff --git a/cmd/tink-worker/worker/worker.go b/cmd/tink-worker/worker/worker.go index d829867a7..cbe0828e2 100644 --- a/cmd/tink-worker/worker/worker.go +++ b/cmd/tink-worker/worker/worker.go @@ -3,8 +3,7 @@ package worker import ( "context" "fmt" - "os" - "path/filepath" + "io" "strconv" "time" @@ -15,7 +14,6 @@ import ( ) const ( - dataFile = "data" defaultDataDir = "/worker" errGetWfContext = "failed to get workflow context" @@ -29,8 +27,6 @@ type loggingContext string var loggingContextKey loggingContext = "logger" -var workflowcontexts = map[string]*pb.WorkflowContext{} - // WorkflowMetadata is the metadata related to workflow data. type WorkflowMetadata struct { WorkerID string `json:"workerID"` @@ -252,20 +248,31 @@ func (w *Worker) executeReaction(ctx context.Context, reaction string, cmd []str // ProcessWorkflowActions gets all Workflow contexts and processes their actions. func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { l := w.logger.With("workerID", w.workerID) + l.Info("starting to process workflow actions") for { res, err := w.tinkClient.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: w.workerID}) if err != nil { - return errors.Wrap(err, errGetWfContext) + l.Error(errors.Wrap(err, errGetWfContext)) + <-time.After(5 * time.Second) } - for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() { + for { + wfContext, err := res.Recv() + if err != nil || wfContext == nil { + if !errors.Is(err, io.EOF) { + l.Info(err) + } + <-time.After(5 * time.Second) + break + } wfID := wfContext.GetWorkflowId() l = l.With("workflowID", wfID) ctx := context.WithValue(ctx, loggingContextKey, &l) actions, err := w.tinkClient.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) if err != nil { - return errors.Wrap(err, errGetWfActions) + l.Error(errors.Wrap(err, errGetWfActions)) + continue } turn := false @@ -298,30 +305,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { } if turn { - wfDir := filepath.Join(w.dataDir, wfID) - l := l.With("actionName", actions.GetActionList()[actionIndex].GetName(), - "taskName", actions.GetActionList()[actionIndex].GetTaskName(), - ) - if _, err := os.Stat(wfDir); os.IsNotExist(err) { - err := os.MkdirAll(wfDir, os.FileMode(0o755)) - if err != nil { - l.Error(err) - os.Exit(1) - } - - f := openDataFile(wfDir, l) - _, err = f.Write([]byte("{}")) - if err != nil { - l.Error(err) - os.Exit(1) - } - - err = f.Close() - if err != nil { - l.Error(err) - os.Exit(1) - } - } + l := l.With("actionName", actions.GetActionList()[actionIndex].GetName(), "taskName", actions.GetActionList()[actionIndex].GetTaskName()) l.Info("starting action") } @@ -342,9 +326,11 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { Message: "Started execution", WorkerId: action.GetWorkerId(), } - err := w.reportActionStatus(ctx, actionStatus) - if err != nil { + RETRY1: + if err := w.reportActionStatus(ctx, actionStatus); err != nil { exitWithGrpcError(err, l) + <-time.After(5 * time.Second) + goto RETRY1 } l.With("status", actionStatus.ActionStatus, "duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status") } @@ -370,25 +356,27 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { } l = l.With("actionStatus", actionStatus.ActionStatus.String()) l.Error(err) - if reportErr := w.reportActionStatus(ctx, actionStatus); reportErr != nil { - exitWithGrpcError(reportErr, l) + RETRY2: + if err := w.reportActionStatus(ctx, actionStatus); err != nil { + exitWithGrpcError(err, l) + <-time.After(5 * time.Second) + goto RETRY2 } - delete(workflowcontexts, wfID) - return err + break } actionStatus.ActionStatus = pb.State_STATE_SUCCESS actionStatus.Message = "finished execution successfully" - - err = w.reportActionStatus(ctx, actionStatus) - if err != nil { + RETRY3: + if err := w.reportActionStatus(ctx, actionStatus); err != nil { exitWithGrpcError(err, l) + <-time.After(5 * time.Second) + goto RETRY3 } l.Info("sent action status") if len(actions.GetActionList()) == actionIndex+1 { l.Info("reached to end of workflow") - delete(workflowcontexts, wfID) turn = false break } @@ -411,7 +399,6 @@ func exitWithGrpcError(err error, l log.Logger) { if err != nil { errStatus, _ := status.FromError(err) l.With("errorCode", errStatus.Code()).Error(err) - os.Exit(1) } } @@ -440,12 +427,3 @@ func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.Workfl } return err } - -func openDataFile(wfDir string, l log.Logger) *os.File { - f, err := os.OpenFile(filepath.Clean(wfDir+string(os.PathSeparator)+dataFile), os.O_RDWR|os.O_CREATE, 0o600) - if err != nil { - l.Error(err) - os.Exit(1) - } - return f -} From 5fb3a11f8789ef67cf8892913b21dc569f780bb5 Mon Sep 17 00:00:00 2001 From: Jacob Weinstock Date: Thu, 22 Dec 2022 18:32:54 -0700 Subject: [PATCH 2/4] Fix e2e test with worker processing: With the worker now not erroring out this e2e test needed updated. Signed-off-by: Jacob Weinstock --- cmd/tink-worker/worker/worker.go | 10 ++++++++++ tests/e2e_test.go | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cmd/tink-worker/worker/worker.go b/cmd/tink-worker/worker/worker.go index cbe0828e2..2822852a9 100644 --- a/cmd/tink-worker/worker/worker.go +++ b/cmd/tink-worker/worker/worker.go @@ -251,12 +251,22 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { l.Info("starting to process workflow actions") for { + select { + case <-ctx.Done(): + return nil + default: + } res, err := w.tinkClient.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: w.workerID}) if err != nil { l.Error(errors.Wrap(err, errGetWfContext)) <-time.After(5 * time.Second) } for { + select { + case <-ctx.Done(): + return nil + default: + } wfContext, err := res.Recv() if err != nil || wfContext == nil { if !errors.Is(err, io.EOF) { diff --git a/tests/e2e_test.go b/tests/e2e_test.go index cbc6c0fd3..e31a9fdb8 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -118,7 +118,7 @@ var _ = Describe("Tink API", func() { }, 8*time.Second, 1*time.Second).Should(Equal("STATE_SUCCESS")) workerErr := <-errChan - Expect(workerErr.Error()).To(Equal("failed to get workflow context: rpc error: code = DeadlineExceeded desc = context deadline exceeded")) + Expect(workerErr).To(BeNil()) }) It("02 - should return the correct workflow contexts", func() { From 10da7ae544ed510eaf4e586e64ff228ac49d224b Mon Sep 17 00:00:00 2001 From: Jacob Weinstock Date: Thu, 22 Dec 2022 18:51:15 -0700 Subject: [PATCH 3/4] Fix testing with Go > 1.18: It appears the template library reports a different error in Go > 1.18 with respect to a missing `}` in a template. (`{{.device_1}`, for example). Signed-off-by: Jacob Weinstock --- workflow/template_validator_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/workflow/template_validator_test.go b/workflow/template_validator_test.go index ead342a79..b03e03f43 100644 --- a/workflow/template_validator_test.go +++ b/workflow/template_validator_test.go @@ -321,8 +321,11 @@ tasks: if err == nil { t.Error("expected error, got nil") } - if !strings.Contains(err.Error(), `template: workflow-template:7: unexpected "}" in operand`) { - t.Errorf("\nexpected err: '%s'\ngot: '%s'", `failed to parse template with ID 98788301-d0d9-4ee9-84df-b64e6e1ef1cc: template: workflow-template:7: unexpected "}" in operand`, err) + e1 := `template: workflow-template:7: unexpected "}" in operand` + e2 := `template: workflow-template:7: bad character U+007D '}'` + if !strings.Contains(err.Error(), e1) && !strings.Contains(err.Error(), e2) { + base := "failed to parse template with ID 98788301-d0d9-4ee9-84df-b64e6e1ef1cc: " + t.Errorf("\nexpected err: '%s'\nor expected err: '%s'\ngot: '%s'", base+e1, base+e2, err) } }, }, From f820d1d4e770371875d58b8ab6d41ccda23a4b78 Mon Sep 17 00:00:00 2001 From: Jacob Weinstock Date: Fri, 23 Dec 2022 12:25:59 -0700 Subject: [PATCH 4/4] improve handling when reporting action status: This removes more unneeded code and drops the use of `goto` statements. Signed-off-by: Jacob Weinstock --- cmd/tink-worker/worker/worker.go | 59 +++++++------------------------- 1 file changed, 13 insertions(+), 46 deletions(-) diff --git a/cmd/tink-worker/worker/worker.go b/cmd/tink-worker/worker/worker.go index 2822852a9..0eed69816 100644 --- a/cmd/tink-worker/worker/worker.go +++ b/cmd/tink-worker/worker/worker.go @@ -10,7 +10,6 @@ import ( "github.com/packethost/pkg/log" "github.com/pkg/errors" pb "github.com/tinkerbell/tink/protos/workflow" - "google.golang.org/grpc/status" ) const ( @@ -161,7 +160,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc return pb.State_STATE_RUNNING, errors.Wrap(err, "create container") } - l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created") + l.With("containerID", id, "command", action.Command).Info("container created") var timeCtx context.Context var cancel context.CancelFunc @@ -259,7 +258,8 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { res, err := w.tinkClient.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: w.workerID}) if err != nil { l.Error(errors.Wrap(err, errGetWfContext)) - <-time.After(5 * time.Second) + <-time.After(w.retryInterval) + continue } for { select { @@ -272,7 +272,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { if !errors.Is(err, io.EOF) { l.Info(err) } - <-time.After(5 * time.Second) + <-time.After(w.retryInterval) break } wfID := wfContext.GetWorkflowId() @@ -314,12 +314,8 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { } } - if turn { - l := l.With("actionName", actions.GetActionList()[actionIndex].GetName(), "taskName", actions.GetActionList()[actionIndex].GetTaskName()) - l.Info("starting action") - } - for turn { + l.Info("starting action") action := actions.GetActionList()[actionIndex] l := l.With( "actionName", action.GetName(), @@ -336,12 +332,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { Message: "Started execution", WorkerId: action.GetWorkerId(), } - RETRY1: - if err := w.reportActionStatus(ctx, actionStatus); err != nil { - exitWithGrpcError(err, l) - <-time.After(5 * time.Second) - goto RETRY1 - } + w.reportActionStatus(ctx, l, actionStatus) l.With("status", actionStatus.ActionStatus, "duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status") } @@ -366,23 +357,13 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { } l = l.With("actionStatus", actionStatus.ActionStatus.String()) l.Error(err) - RETRY2: - if err := w.reportActionStatus(ctx, actionStatus); err != nil { - exitWithGrpcError(err, l) - <-time.After(5 * time.Second) - goto RETRY2 - } + w.reportActionStatus(ctx, l, actionStatus) break } actionStatus.ActionStatus = pb.State_STATE_SUCCESS actionStatus.Message = "finished execution successfully" - RETRY3: - if err := w.reportActionStatus(ctx, actionStatus); err != nil { - exitWithGrpcError(err, l) - <-time.After(5 * time.Second) - goto RETRY3 - } + w.reportActionStatus(ctx, l, actionStatus) l.Info("sent action status") if len(actions.GetActionList()) == actionIndex+1 { @@ -405,35 +386,21 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { } } -func exitWithGrpcError(err error, l log.Logger) { - if err != nil { - errStatus, _ := status.FromError(err) - l.With("errorCode", errStatus.Code()).Error(err) - } -} - func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool { return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1 } -func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error { - l := w.getLogger(ctx).With("workflowID", actionStatus.GetWorkflowId(), - "workerID", actionStatus.GetWorkerId(), - "actionName", actionStatus.GetActionName(), - "taskName", actionStatus.GetTaskName(), - "status", actionStatus.ActionStatus, - ) - var err error - for r := 1; r <= w.retries; r++ { +// reportActionStatus reports the status of an action to the Tinkerbell server and retries forever on error. +func (w *Worker) reportActionStatus(ctx context.Context, l log.Logger, actionStatus *pb.WorkflowActionStatus) { + for { l.Info("reporting Action Status") - _, err = w.tinkClient.ReportActionStatus(ctx, actionStatus) + _, err := w.tinkClient.ReportActionStatus(ctx, actionStatus) if err != nil { l.Error(errors.Wrap(err, errReportActionStatus)) <-time.After(w.retryInterval) continue } - return nil + return } - return err }