Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up tink worker; remove os.Exit; ensure valid container names: #655

Merged
merged 4 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/tink-worker/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func initViper(logger log.Logger, cmd *cobra.Command) error {
logger.With("configFile", viper.ConfigFileUsed()).Error(err, "could not load config file")
return err
}
logger.Info("no config file found")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏🏻

} else {
logger.With("configFile", viper.ConfigFileUsed()).Info("loaded config file")
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/tink-worker/worker/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"path"
"path/filepath"
"regexp"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -76,13 +77,22 @@ func (m *containerManager) CreateContainer(ctx context.Context, cmd []string, wf

hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...)
l.With("command", cmd).Info("creating container")
resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, action.GetName())
name := makeValidContainerName(action.GetName())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot about this, well remembered!

resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, name)
if err != nil {
return "", errors.Wrap(err, "DOCKER CREATE")
}
return resp.ID, nil
}

// makeValidContainerName returns a valid container name for docker.
// only [a-zA-Z0-9][a-zA-Z0-9_.-] are allowed.
func makeValidContainerName(name string) string {
regex := regexp.MustCompile(`[^a-zA-Z0-9_.-]`)
result := "action_" // so we don't need to regex on the first character different from the rest.
return result + regex.ReplaceAllString(name, "_")
}

func (m *containerManager) StartContainer(ctx context.Context, id string) error {
m.getLogger(ctx).With("containerID", id).Debug("starting container")
return errors.Wrap(m.cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START")
Expand Down
119 changes: 37 additions & 82 deletions cmd/tink-worker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package worker
import (
"context"
"fmt"
"os"
"path/filepath"
"io"
"strconv"
"time"

"github.com/packethost/pkg/log"
"github.com/pkg/errors"
pb "github.com/tinkerbell/tink/protos/workflow"
"google.golang.org/grpc/status"
)

const (
dataFile = "data"
defaultDataDir = "/worker"

errGetWfContext = "failed to get workflow context"
Expand All @@ -29,8 +26,6 @@ type loggingContext string

var loggingContextKey loggingContext = "logger"

var workflowcontexts = map[string]*pb.WorkflowContext{}

// WorkflowMetadata is the metadata related to workflow data.
type WorkflowMetadata struct {
WorkerID string `json:"workerID"`
Expand Down Expand Up @@ -165,7 +160,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
return pb.State_STATE_RUNNING, errors.Wrap(err, "create container")
}

l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created")
l.With("containerID", id, "command", action.Command).Info("container created")

var timeCtx context.Context
var cancel context.CancelFunc
Expand Down Expand Up @@ -252,20 +247,42 @@ func (w *Worker) executeReaction(ctx context.Context, reaction string, cmd []str
// ProcessWorkflowActions gets all Workflow contexts and processes their actions.
func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
l := w.logger.With("workerID", w.workerID)
l.Info("starting to process workflow actions")

for {
select {
case <-ctx.Done():
return nil
default:
}
res, err := w.tinkClient.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: w.workerID})
if err != nil {
return errors.Wrap(err, errGetWfContext)
l.Error(errors.Wrap(err, errGetWfContext))
<-time.After(w.retryInterval)
continue
}
for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() {
for {
select {
case <-ctx.Done():
return nil
default:
}
wfContext, err := res.Recv()
if err != nil || wfContext == nil {
if !errors.Is(err, io.EOF) {
l.Info(err)
}
<-time.After(w.retryInterval)
break
}
wfID := wfContext.GetWorkflowId()
l = l.With("workflowID", wfID)
ctx := context.WithValue(ctx, loggingContextKey, &l)

actions, err := w.tinkClient.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID})
if err != nil {
return errors.Wrap(err, errGetWfActions)
l.Error(errors.Wrap(err, errGetWfActions))
continue
}

turn := false
Expand Down Expand Up @@ -297,35 +314,8 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
}
}

if turn {
wfDir := filepath.Join(w.dataDir, wfID)
l := l.With("actionName", actions.GetActionList()[actionIndex].GetName(),
"taskName", actions.GetActionList()[actionIndex].GetTaskName(),
)
if _, err := os.Stat(wfDir); os.IsNotExist(err) {
err := os.MkdirAll(wfDir, os.FileMode(0o755))
if err != nil {
l.Error(err)
os.Exit(1)
}

f := openDataFile(wfDir, l)
_, err = f.Write([]byte("{}"))
if err != nil {
l.Error(err)
os.Exit(1)
}

err = f.Close()
if err != nil {
l.Error(err)
os.Exit(1)
}
}
l.Info("starting action")
}

for turn {
l.Info("starting action")
action := actions.GetActionList()[actionIndex]
l := l.With(
"actionName", action.GetName(),
Expand All @@ -342,10 +332,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
Message: "Started execution",
WorkerId: action.GetWorkerId(),
}
err := w.reportActionStatus(ctx, actionStatus)
if err != nil {
exitWithGrpcError(err, l)
}
w.reportActionStatus(ctx, l, actionStatus)
l.With("status", actionStatus.ActionStatus, "duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status")
}

Expand All @@ -370,25 +357,17 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
}
l = l.With("actionStatus", actionStatus.ActionStatus.String())
l.Error(err)
if reportErr := w.reportActionStatus(ctx, actionStatus); reportErr != nil {
exitWithGrpcError(reportErr, l)
}
delete(workflowcontexts, wfID)
return err
w.reportActionStatus(ctx, l, actionStatus)
break
}

actionStatus.ActionStatus = pb.State_STATE_SUCCESS
actionStatus.Message = "finished execution successfully"

err = w.reportActionStatus(ctx, actionStatus)
if err != nil {
exitWithGrpcError(err, l)
}
w.reportActionStatus(ctx, l, actionStatus)
l.Info("sent action status")

if len(actions.GetActionList()) == actionIndex+1 {
l.Info("reached to end of workflow")
delete(workflowcontexts, wfID)
turn = false
break
}
Expand All @@ -407,45 +386,21 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
}
}

func exitWithGrpcError(err error, l log.Logger) {
if err != nil {
errStatus, _ := status.FromError(err)
l.With("errorCode", errStatus.Code()).Error(err)
os.Exit(1)
}
}

func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) bool {
return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1
}

func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error {
l := w.getLogger(ctx).With("workflowID", actionStatus.GetWorkflowId(),
"workerID", actionStatus.GetWorkerId(),
"actionName", actionStatus.GetActionName(),
"taskName", actionStatus.GetTaskName(),
"status", actionStatus.ActionStatus,
)
var err error
for r := 1; r <= w.retries; r++ {
// reportActionStatus reports the status of an action to the Tinkerbell server and retries forever on error.
func (w *Worker) reportActionStatus(ctx context.Context, l log.Logger, actionStatus *pb.WorkflowActionStatus) {
for {
l.Info("reporting Action Status")
_, err = w.tinkClient.ReportActionStatus(ctx, actionStatus)
_, err := w.tinkClient.ReportActionStatus(ctx, actionStatus)
if err != nil {
l.Error(errors.Wrap(err, errReportActionStatus))
<-time.After(w.retryInterval)

continue
}
return nil
}
return err
}

func openDataFile(wfDir string, l log.Logger) *os.File {
f, err := os.OpenFile(filepath.Clean(wfDir+string(os.PathSeparator)+dataFile), os.O_RDWR|os.O_CREATE, 0o600)
if err != nil {
l.Error(err)
os.Exit(1)
return
}
return f
}
2 changes: 1 addition & 1 deletion tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("Tink API", func() {
}, 8*time.Second, 1*time.Second).Should(Equal("STATE_SUCCESS"))

workerErr := <-errChan
Expect(workerErr.Error()).To(Equal("failed to get workflow context: rpc error: code = DeadlineExceeded desc = context deadline exceeded"))
Expect(workerErr).To(BeNil())
})

It("02 - should return the correct workflow contexts", func() {
Expand Down
7 changes: 5 additions & 2 deletions workflow/template_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,11 @@ tasks:
if err == nil {
t.Error("expected error, got nil")
}
if !strings.Contains(err.Error(), `template: workflow-template:7: unexpected "}" in operand`) {
t.Errorf("\nexpected err: '%s'\ngot: '%s'", `failed to parse template with ID 98788301-d0d9-4ee9-84df-b64e6e1ef1cc: template: workflow-template:7: unexpected "}" in operand`, err)
e1 := `template: workflow-template:7: unexpected "}" in operand`
e2 := `template: workflow-template:7: bad character U+007D '}'`
if !strings.Contains(err.Error(), e1) && !strings.Contains(err.Error(), e2) {
base := "failed to parse template with ID 98788301-d0d9-4ee9-84df-b64e6e1ef1cc: "
t.Errorf("\nexpected err: '%s'\nor expected err: '%s'\ngot: '%s'", base+e1, base+e2, err)
}
},
},
Expand Down