Skip to content

Commit

Permalink
Added virtual worker
Browse files Browse the repository at this point in the history
Signed-off-by: Micah Hausler <[email protected]>
  • Loading branch information
micahhausler committed Apr 5, 2022
1 parent d3512cb commit 5417ada
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ jobs:
name: tink-worker
path: cmd/tink-worker/tink-worker-*

- name: Upload virtual-worker binaries
uses: actions/upload-artifact@v2
with:
name: virtual-worker
path: cmd/virtual-worker/virtual-worker-*

- name: Upload tink-controller binaries
uses: actions/upload-artifact@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ all: cli server worker ## Build all binaries for host OS and CPU
-include kube.mk

crosscompile: $(crossbinaries) ## Build all binaries for Linux and all supported CPU arches
images: tink-cli-image tink-server-image tink-worker-image ## Build all docker images
images: tink-cli-image tink-server-image tink-worker-image virtual-worker-image ## Build all docker images
run: crosscompile run-stack ## Builds and runs the Tink stack (tink, db, cli) via docker-compose

test: ## Run tests
Expand Down
2 changes: 2 additions & 0 deletions cmd/virtual-worker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
virtual-worker
virtual-worker-*
8 changes: 8 additions & 0 deletions cmd/virtual-worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM alpine:3.11
ENTRYPOINT [ "/usr/bin/virtual-worker" ]

ARG TARGETARCH
ARG TARGETVARIANT

RUN apk add --no-cache --update --upgrade ca-certificates
COPY virtual-worker-linux-${TARGETARCH:-amd64}${TARGETVARIANT} /usr/bin/virtual-worker
183 changes: 183 additions & 0 deletions cmd/virtual-worker/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package cmd

import (
"context"
"fmt"
"strings"
"time"

"github.com/packethost/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/tinkerbell/tink/client"
tinkWorker "github.com/tinkerbell/tink/cmd/tink-worker/worker"
"github.com/tinkerbell/tink/cmd/virtual-worker/worker"
pb "github.com/tinkerbell/tink/protos/workflow"
"google.golang.org/grpc"
)

const (
defaultRetryIntervalSeconds = 3
defaultRetryCount = 3
defaultMaxFileSize int64 = 10 * 1024 * 1024 // 10MB
defaultTimeoutMinutes = 60
)

// NewRootCommand creates a new Virtual Worker Cobra root command.
func NewRootCommand(version string, logger log.Logger) *cobra.Command {
rootCmd := &cobra.Command{
Use: "virtual-worker",
Short: "Virtual Tink Worker",
PreRunE: func(cmd *cobra.Command, args []string) error {
viper, err := createViper(logger)
if err != nil {
return err
}
return applyViper(viper, cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
retryInterval, _ := cmd.Flags().GetDuration("retry-interval")
retries, _ := cmd.Flags().GetInt("max-retry")
// TODO(displague) is log-level no longer useful?
// logLevel, _ := cmd.Flags().GetString("log-level")
workerID, _ := cmd.Flags().GetString("id")
maxFileSize, _ := cmd.Flags().GetInt64("max-file-size")
timeOut, _ := cmd.Flags().GetDuration("timeout")
captureActionLogs, _ := cmd.Flags().GetBool("capture-action-logs")

logger.With("version", version).Info("starting")
if setupErr := client.Setup(); setupErr != nil {
return setupErr
}

ctx := context.Background()
if timeOut > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeOut)
defer cancel()
}

conn, err := tryClientConnection(logger, retryInterval, retries)
if err != nil {
return err
}
rClient := pb.NewWorkflowServiceClient(conn)

containerManager := worker.NewFakeContainerManager(logger, 1000, 2000)
logCapturer := worker.NewEmptyLogCapturer()

w := tinkWorker.NewWorker(
workerID,
rClient,
containerManager,
logCapturer,
logger,
tinkWorker.WithMaxFileSize(maxFileSize),
tinkWorker.WithRetries(retryInterval, retries),
tinkWorker.WithLogCapture(captureActionLogs))

err = w.ProcessWorkflowActions(ctx)
if err != nil {
return errors.Wrap(err, "worker Finished with error")
}
return nil
},
}

rootCmd.Flags().Duration("retry-interval", defaultRetryIntervalSeconds*time.Second, "Retry interval in seconds (RETRY_INTERVAL)")

rootCmd.Flags().Duration("timeout", defaultTimeoutMinutes*time.Minute, "Max duration to wait for worker to complete (TIMEOUT)")

rootCmd.Flags().Int("max-retry", defaultRetryCount, "Maximum number of retries to attempt (MAX_RETRY)")

rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)")

rootCmd.Flags().Bool("capture-action-logs", true, "Capture action container output as part of worker logs")

// rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)")

must := func(err error) {
if err != nil {
logger.Fatal(err)
}
}

rootCmd.Flags().StringP("id", "i", "", "Sets the worker id (ID)")
must(rootCmd.MarkFlagRequired("id"))

rootCmd.Flags().StringP("docker-registry", "r", "", "Sets the Docker registry (DOCKER_REGISTRY)")
must(rootCmd.MarkFlagRequired("docker-registry"))

rootCmd.Flags().StringP("registry-username", "u", "", "Sets the registry username (REGISTRY_USERNAME)")
must(rootCmd.MarkFlagRequired("registry-username"))

rootCmd.Flags().StringP("registry-password", "p", "", "Sets the registry-password (REGISTRY_PASSWORD)")
must(rootCmd.MarkFlagRequired("registry-password"))

return rootCmd
}

// createViper creates a Viper object configured to read in configuration files
// (from various paths with content type specific filename extensions) and loads
// environment variables.
func createViper(logger log.Logger) (*viper.Viper, error) {
v := viper.New()
v.AutomaticEnv()
v.SetConfigName("virtual-worker")
v.AddConfigPath("/etc/tinkerbell")
v.AddConfigPath(".")
v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))

// If a config file is found, read it in.
if err := v.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
logger.With("configFile", v.ConfigFileUsed()).Error(err, "could not load config file")
return nil, err
}
logger.Info("no config file found")
} else {
logger.With("configFile", v.ConfigFileUsed()).Info("loaded config file")
}

return v, nil
}

func applyViper(v *viper.Viper, cmd *cobra.Command) error {
errs := []error{}

cmd.Flags().VisitAll(func(f *pflag.Flag) {
if !f.Changed && v.IsSet(f.Name) {
val := v.Get(f.Name)
if err := cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val)); err != nil {
errs = append(errs, err)
return
}
}
})

if len(errs) > 0 {
es := []string{}
for _, err := range errs {
es = append(es, err.Error())
}
return fmt.Errorf(strings.Join(es, ", "))
}

return nil
}

func tryClientConnection(logger log.Logger, retryInterval time.Duration, retries int) (*grpc.ClientConn, error) {
for ; retries > 0; retries-- {
c, err := client.GetConnection()
if err != nil {
logger.With("error", err, "duration", retryInterval).Info("failed to connect, sleeping before retrying")
<-time.After(retryInterval)
continue
}

return c, nil
}
return nil, fmt.Errorf("retries exceeded")
}
28 changes: 28 additions & 0 deletions cmd/virtual-worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"os"

"github.com/packethost/pkg/log"
"github.com/tinkerbell/tink/cmd/virtual-worker/cmd"
)

const (
serviceKey = "github.com/tinkerbell/tink"
)

// version is set at build time.
var version = "devel"

func main() {
logger, err := log.Init(serviceKey)
if err != nil {
panic(err)
}

rootCmd := cmd.NewRootCommand(version, logger)
if err := rootCmd.Execute(); err != nil {
os.Exit(1)
}
logger.Close()
}
78 changes: 78 additions & 0 deletions cmd/virtual-worker/worker/container_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package worker

import (
"context"
"math/rand"
"time"

"github.com/packethost/pkg/log"
"github.com/tinkerbell/tink/cmd/tink-worker/worker"
pb "github.com/tinkerbell/tink/protos/workflow"
)

func getRandHexStr(length int) string {
// intentionally weak RNG. This is only for fake output
r := rand.New(rand.NewSource(time.Now().UnixNano()))
alphabet := []byte("1234567890abcdef")
resp := []byte{}
for i := 0; i < length; i++ {
resp = append(resp, alphabet[r.Intn(len(alphabet))])
}
return string(resp)
}

type fakeManager struct {
// minimum milliseconds to sleep for faked Docker API calls
sleepMinimumMs int
// additional jitter milliseconds to sleep for faked Docker API calls
sleepJitterMs int

logger log.Logger
}

// NewFakeContainerManager returns a fake worker.ContainerManager that will sleep for Docker API calls.
func NewFakeContainerManager(l log.Logger, sleepMinimum, sleepJitter int) worker.ContainerManager {
return &fakeManager{
sleepMinimumMs: sleepMinimum,
sleepJitterMs: sleepJitter,
logger: l,
}
}

func (m *fakeManager) CreateContainer(_ context.Context, cmd []string, _ string, _ *pb.WorkflowAction, _, _ bool) (string, error) {
m.logger.With("command", cmd).Info("creating container")
return getRandHexStr(64), nil
}

func (m *fakeManager) StartContainer(_ context.Context, id string) error {
m.logger.With("containerID", id).Debug("starting container")
return nil
}

func (m *fakeManager) WaitForContainer(_ context.Context, id string) (pb.State, error) {
m.logger.With("containerID", id).Info("waiting for container")

r := rand.New(rand.NewSource(time.Now().UnixNano()))
time.Sleep(time.Duration(r.Intn(m.sleepJitterMs)+m.sleepMinimumMs) * time.Millisecond)

return pb.State_STATE_SUCCESS, nil
}

func (m *fakeManager) WaitForFailedContainer(_ context.Context, id string, failedActionStatus chan pb.State) {
m.logger.With("containerID", id).Info("waiting for container")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
time.Sleep(time.Duration(r.Intn(m.sleepJitterMs)+m.sleepMinimumMs) * time.Millisecond)
failedActionStatus <- pb.State_STATE_SUCCESS
}

func (m *fakeManager) RemoveContainer(_ context.Context, id string) error {
m.logger.With("containerID", id).Info("removing container")
return nil
}

func (m *fakeManager) PullImage(_ context.Context, image string) error {
m.logger.With("image", image).Info("pulling image")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
time.Sleep(time.Duration(r.Intn(m.sleepJitterMs)+m.sleepMinimumMs) * time.Millisecond)
return nil
}
16 changes: 16 additions & 0 deletions cmd/virtual-worker/worker/log_capturer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package worker

import (
"context"

"github.com/tinkerbell/tink/cmd/tink-worker/worker"
)

type emptyLogger struct{}

func (l *emptyLogger) CaptureLogs(context.Context, string) {}

// NewEmptyLogCapturer returns an no-op log capturer.
func NewEmptyLogCapturer() worker.LogCapturer {
return &emptyLogger{}
}
9 changes: 6 additions & 3 deletions rules.mk
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ MAKEFLAGS += --no-builtin-rules
SHELL := bash
.SHELLFLAGS := -o pipefail -euc

binaries := cmd/tink-cli/tink-cli cmd/tink-controller/tink-controller cmd/tink-server/tink-server cmd/tink-worker/tink-worker
binaries := cmd/tink-cli/tink-cli cmd/tink-controller/tink-controller cmd/tink-server/tink-server cmd/tink-worker/tink-worker cmd/virtual-worker/virtual-worker
version := $(shell git rev-parse --short HEAD)
tag := $(shell git tag --points-at HEAD)
ifneq (,$(tag))
Expand All @@ -20,11 +20,12 @@ endif
LDFLAGS := -ldflags "-X main.version=$(version)"
export CGO_ENABLED := 0

.PHONY: server cli worker test $(binaries)
.PHONY: server cli worker virtual-worker test $(binaries)
cli: cmd/tink-cli/tink-cli
controller: cmd/tink-controller/tink-controller
server: cmd/tink-server/tink-server
worker : cmd/tink-worker/tink-worker
virtual-worker : cmd/virtual-worker/virtual-worker

crossbinaries := $(addsuffix -linux-,$(binaries))
crossbinaries := $(crossbinaries:=386) $(crossbinaries:=amd64) $(crossbinaries:=arm64) $(crossbinaries:=armv6) $(crossbinaries:=armv7)
Expand All @@ -38,7 +39,7 @@ crossbinaries := $(crossbinaries:=386) $(crossbinaries:=amd64) $(crossbinaries:=
$(binaries) $(crossbinaries):
$(FLAGS) go build $(LDFLAGS) -o $@ ./$(@D)

.PHONY: tink-cli-image tink-controller-image tink-server-image tink-worker-image
.PHONY: tink-cli-image tink-controller-image tink-server-image tink-worker-image virtual-worker-image
tink-cli-image: cmd/tink-cli/tink-cli-linux-amd64
docker build -t tink-cli cmd/tink-cli/
tink-controller-image: cmd/tink-controller/tink-controller-linux-amd64
Expand All @@ -47,6 +48,8 @@ tink-server-image: cmd/tink-server/tink-server-linux-amd64
docker build -t tink-server cmd/tink-server/
tink-worker-image: cmd/tink-worker/tink-worker-linux-amd64
docker build -t tink-worker cmd/tink-worker/
virtual-worker-image: cmd/virtual-worker/virtual-worker-linux-amd64
docker build -t virtual-worker cmd/virtual-worker/

.PHONY: run-stack
run-stack:
Expand Down

0 comments on commit 5417ada

Please sign in to comment.