Skip to content

Commit

Permalink
Merge pull request #655 from jacobweinstock/server-refactor
Browse files Browse the repository at this point in the history
Clean up tink worker; remove os.Exit; ensure valid container names:
  • Loading branch information
mergify[bot] authored Dec 23, 2022
2 parents 5b62d12 + f820d1d commit fdfecb9
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 87 deletions.
1 change: 0 additions & 1 deletion cmd/tink-worker/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func initViper(logger log.Logger, cmd *cobra.Command) error {
logger.With("configFile", viper.ConfigFileUsed()).Error(err, "could not load config file")
return err
}
logger.Info("no config file found")
} else {
logger.With("configFile", viper.ConfigFileUsed()).Info("loaded config file")
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/tink-worker/worker/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"path"
"path/filepath"
"regexp"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -76,13 +77,22 @@ func (m *containerManager) CreateContainer(ctx context.Context, cmd []string, wf

hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...)
l.With("command", cmd).Info("creating container")
resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, action.GetName())
name := makeValidContainerName(action.GetName())
resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, name)
if err != nil {
return "", errors.Wrap(err, "DOCKER CREATE")
}
return resp.ID, nil
}

// makeValidContainerName returns a valid container name for docker.
// only [a-zA-Z0-9][a-zA-Z0-9_.-] are allowed.
func makeValidContainerName(name string) string {
regex := regexp.MustCompile(`[^a-zA-Z0-9_.-]`)
result := "action_" // so we don't need to regex on the first character different from the rest.
return result + regex.ReplaceAllString(name, "_")
}

func (m *containerManager) StartContainer(ctx context.Context, id string) error {
m.getLogger(ctx).With("containerID", id).Debug("starting container")
return errors.Wrap(m.cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START")
Expand Down
119 changes: 37 additions & 82 deletions cmd/tink-worker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package worker
import (
"context"
"fmt"
"os"
"path/filepath"
"io"
"strconv"
"time"

"github.com/packethost/pkg/log"
"github.com/pkg/errors"
pb "github.com/tinkerbell/tink/protos/workflow"
"google.golang.org/grpc/status"
)

const (
dataFile = "data"
defaultDataDir = "/worker"

errGetWfContext = "failed to get workflow context"
Expand All @@ -29,8 +26,6 @@ type loggingContext string

var loggingContextKey loggingContext = "logger"

var workflowcontexts = map[string]*pb.WorkflowContext{}

// WorkflowMetadata is the metadata related to workflow data.
type WorkflowMetadata struct {
WorkerID string `json:"workerID"`
Expand Down Expand Up @@ -165,7 +160,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
return pb.State_STATE_RUNNING, errors.Wrap(err, "create container")
}

l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created")
l.With("containerID", id, "command", action.Command).Info("container created")

var timeCtx context.Context
var cancel context.CancelFunc
Expand Down Expand Up @@ -252,20 +247,42 @@ func (w *Worker) executeReaction(ctx context.Context, reaction string, cmd []str
// ProcessWorkflowActions gets all Workflow contexts and processes their actions.
func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
l := w.logger.With("workerID", w.workerID)
l.Info("starting to process workflow actions")

for {
select {
case <-ctx.Done():
return nil
default:
}
res, err := w.tinkClient.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: w.workerID})
if err != nil {
return errors.Wrap(err, errGetWfContext)
l.Error(errors.Wrap(err, errGetWfContext))
<-time.After(w.retryInterval)
continue
}
for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() {
for {
select {
case <-ctx.Done():
return nil
default:
}
wfContext, err := res.Recv()
if err != nil || wfContext == nil {
if !errors.Is(err, io.EOF) {
l.Info(err)
}
<-time.After(w.retryInterval)
break
}
wfID := wfContext.GetWorkflowId()
l = l.With("workflowID", wfID)
ctx := context.WithValue(ctx, loggingContextKey, &l)

actions, err := w.tinkClient.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID})
if err != nil {
return errors.Wrap(err, errGetWfActions)
l.Error(errors.Wrap(err, errGetWfActions))
continue
}

turn := false
Expand Down Expand Up @@ -297,35 +314,8 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
}
}

if turn {
wfDir := filepath.Join(w.dataDir, wfID)
l := l.With("actionName", actions.GetActionList()[actionIndex].GetName(),
"taskName", actions.GetActionList()[actionIndex].GetTaskName(),
)
if _, err := os.Stat(wfDir); os.IsNotExist(err) {
err := os.MkdirAll(wfDir, os.FileMode(0o755))
if err != nil {
l.Error(err)
os.Exit(1)
}

f := openDataFile(wfDir, l)
_, err = f.Write([]byte("{}"))
if err != nil {
l.Error(err)
os.Exit(1)
}

err = f.Close()
if err != nil {
l.Error(err)
os.Exit(1)
}
}
l.Info("starting action")
}

for turn {
l.Info("starting action")
action := actions.GetActionList()[actionIndex]
l := l.With(
"actionName", action.GetName(),
Expand All @@ -342,10 +332,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
Message: "Started execution",
WorkerId: action.GetWorkerId(),
}
err := w.reportActionStatus(ctx, actionStatus)
if err != nil {
exitWithGrpcError(err, l)
}
w.reportActionStatus(ctx, l, actionStatus)
l.With("status", actionStatus.ActionStatus, "duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status")
}

Expand All @@ -370,25 +357,17 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
}
l = l.With("actionStatus", actionStatus.ActionStatus.String())
l.Error(err)
if reportErr := w.reportActionStatus(ctx, actionStatus); reportErr != nil {
exitWithGrpcError(reportErr, l)
}
delete(workflowcontexts, wfID)
return err
w.reportActionStatus(ctx, l, actionStatus)
break
}

actionStatus.ActionStatus = pb.State_STATE_SUCCESS
actionStatus.Message = "finished execution successfully"

err = w.reportActionStatus(ctx, actionStatus)
if err != nil {
exitWithGrpcError(err, l)
}
w.reportActionStatus(ctx, l, actionStatus)
l.Info("sent action status")

if len(actions.GetActionList()) == actionIndex+1 {
l.Info("reached to end of workflow")
delete(workflowcontexts, wfID)
turn = false
break
}
Expand All @@ -407,45 +386,21 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
}
}

func exitWithGrpcError(err error, l log.Logger) {
if err != nil {
errStatus, _ := status.FromError(err)
l.With("errorCode", errStatus.Code()).Error(err)
os.Exit(1)
}
}

func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool {
return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1
}

func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error {
l := w.getLogger(ctx).With("workflowID", actionStatus.GetWorkflowId(),
"workerID", actionStatus.GetWorkerId(),
"actionName", actionStatus.GetActionName(),
"taskName", actionStatus.GetTaskName(),
"status", actionStatus.ActionStatus,
)
var err error
for r := 1; r <= w.retries; r++ {
// reportActionStatus reports the status of an action to the Tinkerbell server and retries forever on error.
func (w *Worker) reportActionStatus(ctx context.Context, l log.Logger, actionStatus *pb.WorkflowActionStatus) {
for {
l.Info("reporting Action Status")
_, err = w.tinkClient.ReportActionStatus(ctx, actionStatus)
_, err := w.tinkClient.ReportActionStatus(ctx, actionStatus)
if err != nil {
l.Error(errors.Wrap(err, errReportActionStatus))
<-time.After(w.retryInterval)

continue
}
return nil
}
return err
}

func openDataFile(wfDir string, l log.Logger) *os.File {
f, err := os.OpenFile(filepath.Clean(wfDir+string(os.PathSeparator)+dataFile), os.O_RDWR|os.O_CREATE, 0o600)
if err != nil {
l.Error(err)
os.Exit(1)
return
}
return f
}
2 changes: 1 addition & 1 deletion tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("Tink API", func() {
}, 8*time.Second, 1*time.Second).Should(Equal("STATE_SUCCESS"))

workerErr := <-errChan
Expect(workerErr.Error()).To(Equal("failed to get workflow context: rpc error: code = DeadlineExceeded desc = context deadline exceeded"))
Expect(workerErr).To(BeNil())
})

It("02 - should return the correct workflow contexts", func() {
Expand Down
7 changes: 5 additions & 2 deletions workflow/template_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,11 @@ tasks:
if err == nil {
t.Error("expected error, got nil")
}
if !strings.Contains(err.Error(), `template: workflow-template:7: unexpected "}" in operand`) {
t.Errorf("\nexpected err: '%s'\ngot: '%s'", `failed to parse template with ID 98788301-d0d9-4ee9-84df-b64e6e1ef1cc: template: workflow-template:7: unexpected "}" in operand`, err)
e1 := `template: workflow-template:7: unexpected "}" in operand`
e2 := `template: workflow-template:7: bad character U+007D '}'`
if !strings.Contains(err.Error(), e1) && !strings.Contains(err.Error(), e2) {
base := "failed to parse template with ID 98788301-d0d9-4ee9-84df-b64e6e1ef1cc: "
t.Errorf("\nexpected err: '%s'\nor expected err: '%s'\ngot: '%s'", base+e1, base+e2, err)
}
},
},
Expand Down

0 comments on commit fdfecb9

Please sign in to comment.