Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added virtual worker #602

Merged
merged 2 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ jobs:
name: tink-worker
path: cmd/tink-worker/tink-worker-*

- name: Upload virtual-worker binaries
uses: actions/upload-artifact@v2
with:
name: virtual-worker
path: cmd/virtual-worker/virtual-worker-*

- name: Upload tink-controller binaries
uses: actions/upload-artifact@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ all: cli server worker ## Build all binaries for host OS and CPU
-include kube.mk

crosscompile: $(crossbinaries) ## Build all binaries for Linux and all supported CPU arches
images: tink-cli-image tink-server-image tink-worker-image ## Build all docker images
images: tink-cli-image tink-server-image tink-worker-image virtual-worker-image ## Build all docker images
run: crosscompile run-stack ## Builds and runs the Tink stack (tink, db, cli) via docker-compose

test: ## Run tests
Expand Down
2 changes: 2 additions & 0 deletions cmd/virtual-worker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
virtual-worker
virtual-worker-*
8 changes: 8 additions & 0 deletions cmd/virtual-worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM alpine:3.15
ENTRYPOINT [ "/usr/bin/virtual-worker" ]

ARG TARGETARCH
ARG TARGETVARIANT

RUN apk add --no-cache --update --upgrade ca-certificates
COPY virtual-worker-linux-${TARGETARCH:-amd64}${TARGETVARIANT} /usr/bin/virtual-worker
122 changes: 122 additions & 0 deletions cmd/virtual-worker/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package cmd

import (
"strings"
"time"

"github.com/packethost/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/tinkerbell/tink/client"
tinkWorker "github.com/tinkerbell/tink/cmd/tink-worker/worker"
"github.com/tinkerbell/tink/cmd/virtual-worker/worker"
"github.com/tinkerbell/tink/protos/workflow"
)

const (
defaultRetryIntervalSeconds = 3
defaultRetryCount = 3
defaultMaxFileSize = 10 * 1024 * 1024 // 10MB
)

// NewRootCommand creates a new Virtual Worker Cobra root command.
func NewRootCommand(version string, logger log.Logger) *cobra.Command {
rootCmd := &cobra.Command{
Use: "virtual-worker",
Short: "Virtual Tink Worker",
PreRunE: func(cmd *cobra.Command, args []string) error {
return createViper(logger, cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
retryInterval := viper.GetDuration("retry-interval")
retries := viper.GetInt("max-retry")
workerID := viper.GetString("id")
maxFileSize := viper.GetInt64("max-file-size")
captureActionLogs := viper.GetBool("capture-action-logs")
sleepMin := viper.GetDuration("sleep-min")
sleepJitter := viper.GetDuration("sleep-jitter")

logger.With("version", version).Info("starting")

conn, err := client.NewClientConn(
viper.GetString("tinkerbell-grpc-authority"),
viper.GetBool("tinkerbell-tls"),
)
if err != nil {
return err
}
workflowClient := workflow.NewWorkflowServiceClient(conn)

containerManager := worker.NewFakeContainerManager(logger, sleepMin, sleepJitter)
logCapturer := worker.NewEmptyLogCapturer()

w := tinkWorker.NewWorker(
workerID,
workflowClient,
containerManager,
logCapturer,
logger,
tinkWorker.WithMaxFileSize(maxFileSize),
tinkWorker.WithRetries(retryInterval, retries),
tinkWorker.WithLogCapture(captureActionLogs))

err = w.ProcessWorkflowActions(cmd.Context())
if err != nil {
return errors.Wrap(err, "worker Finished with error")
}
return nil
},
}

rootCmd.Flags().Duration("retry-interval", defaultRetryIntervalSeconds*time.Second, "Retry interval in seconds (RETRY_INTERVAL)")
rootCmd.Flags().Int("max-retry", defaultRetryCount, "Maximum number of retries to attempt (MAX_RETRY)")
rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)")
rootCmd.Flags().Bool("capture-action-logs", true, "Capture action container output as part of worker logs")
rootCmd.Flags().Duration("sleep-min", time.Second*4, "The minimum amount of time to sleep during faked docker operations")
rootCmd.Flags().Duration("sleep-jitter", time.Second*2, "The amount of jitter to add during faked docker operations")

must := func(err error) {
if err != nil {
logger.Fatal(err)
}
}

rootCmd.Flags().StringP("id", "i", "", "Sets the worker id (ID)")
must(rootCmd.MarkFlagRequired("id"))

_ = viper.BindPFlags(rootCmd.Flags())

return rootCmd
}

// createViper creates a Viper object configured to read in configuration files
// (from various paths with content type specific filename extensions) and loads
// environment variables.
func createViper(logger log.Logger, cmd *cobra.Command) error {
viper.AutomaticEnv()
viper.SetConfigName("virtual-worker")
viper.AddConfigPath("/etc/tinkerbell")
viper.AddConfigPath(".")
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))

// If a config file is found, read it in.
if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
logger.With("configFile", viper.ConfigFileUsed()).Error(err, "could not load config file")
return err
}
logger.Info("no config file found")
} else {
logger.With("configFile", viper.ConfigFileUsed()).Info("loaded config file")
}

cmd.Flags().VisitAll(func(f *pflag.Flag) {
if viper.IsSet(f.Name) {
_ = cmd.Flags().SetAnnotation(f.Name, cobra.BashCompOneRequiredFlag, []string{"false"})
}
})

return nil
}
27 changes: 27 additions & 0 deletions cmd/virtual-worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"os"

"github.com/packethost/pkg/log"
"github.com/tinkerbell/tink/cmd/virtual-worker/cmd"
)

const (
serviceKey = "github.com/tinkerbell/tink"
)

// version is set at build time.
var version = "devel"

func main() {
logger, err := log.Init(serviceKey)
if err != nil {
panic(err)
}
rootCmd := cmd.NewRootCommand(version, logger)
if err := rootCmd.Execute(); err != nil {
os.Exit(1)
micahhausler marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Close()
mmlb marked this conversation as resolved.
Show resolved Hide resolved
}
81 changes: 81 additions & 0 deletions cmd/virtual-worker/worker/container_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package worker

import (
"context"
"math/rand"
"time"

"github.com/packethost/pkg/log"
"github.com/tinkerbell/tink/cmd/tink-worker/worker"
pb "github.com/tinkerbell/tink/protos/workflow"
)

func getRandHexStr(r *rand.Rand, length int) string {
alphabet := []byte("1234567890abcdef")
resp := []byte{}
for i := 0; i < length; i++ {
resp = append(resp, alphabet[r.Intn(len(alphabet))])
}
return string(resp)
}

type fakeManager struct {
// minimum milliseconds to sleep for faked Docker API calls
sleepMinimum time.Duration
// additional jitter milliseconds to sleep for faked Docker API calls
sleepJitter time.Duration

r *rand.Rand
logger log.Logger
}

func (m *fakeManager) sleep() {
jitter := time.Duration(m.r.Int31n(int32(m.sleepJitter.Milliseconds()))) * time.Millisecond
time.Sleep(jitter + m.sleepMinimum)
}

// NewFakeContainerManager returns a fake worker.ContainerManager that will sleep for Docker API calls.
func NewFakeContainerManager(l log.Logger, sleepMinimum, sleepJitter time.Duration) worker.ContainerManager {
return &fakeManager{
sleepMinimum: sleepMinimum,
sleepJitter: sleepJitter,
logger: l,
// intentionally weak RNG. This is only for fake output
r: rand.New(rand.NewSource(time.Now().UnixNano())),
micahhausler marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (m *fakeManager) CreateContainer(_ context.Context, cmd []string, _ string, _ *pb.WorkflowAction, _, _ bool) (string, error) {
m.logger.With("command", cmd).Info("creating container")
return getRandHexStr(m.r, 64), nil
}

func (m *fakeManager) StartContainer(_ context.Context, id string) error {
m.logger.With("containerID", id).Debug("starting container")
return nil
}

func (m *fakeManager) WaitForContainer(_ context.Context, id string) (pb.State, error) {
m.logger.With("containerID", id).Info("waiting for container")
m.sleep()

return pb.State_STATE_SUCCESS, nil
}

func (m *fakeManager) WaitForFailedContainer(_ context.Context, id string, failedActionStatus chan pb.State) {
m.logger.With("containerID", id).Info("waiting for container")
m.sleep()
failedActionStatus <- pb.State_STATE_SUCCESS
}

func (m *fakeManager) RemoveContainer(_ context.Context, id string) error {
m.logger.With("containerID", id).Info("removing container")
return nil
}

func (m *fakeManager) PullImage(_ context.Context, image string) error {
m.logger.With("image", image).Info("pulling image")
m.sleep()

return nil
}
16 changes: 16 additions & 0 deletions cmd/virtual-worker/worker/log_capturer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package worker

import (
"context"

"github.com/tinkerbell/tink/cmd/tink-worker/worker"
)

type emptyLogger struct{}

func (l *emptyLogger) CaptureLogs(context.Context, string) {}

// NewEmptyLogCapturer returns an no-op log capturer.
func NewEmptyLogCapturer() worker.LogCapturer {
return &emptyLogger{}
}
9 changes: 6 additions & 3 deletions rules.mk
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ MAKEFLAGS += --no-builtin-rules
SHELL := bash
.SHELLFLAGS := -o pipefail -euc

binaries := cmd/tink-cli/tink-cli cmd/tink-controller/tink-controller cmd/tink-server/tink-server cmd/tink-worker/tink-worker
binaries := cmd/tink-cli/tink-cli cmd/tink-controller/tink-controller cmd/tink-server/tink-server cmd/tink-worker/tink-worker cmd/virtual-worker/virtual-worker
version := $(shell git rev-parse --short HEAD)
tag := $(shell git tag --points-at HEAD)
ifneq (,$(tag))
Expand All @@ -20,11 +20,12 @@ endif
LDFLAGS := -ldflags "-X main.version=$(version)"
export CGO_ENABLED := 0

.PHONY: server cli worker test $(binaries)
.PHONY: server cli worker virtual-worker test $(binaries)
cli: cmd/tink-cli/tink-cli
controller: cmd/tink-controller/tink-controller
server: cmd/tink-server/tink-server
worker : cmd/tink-worker/tink-worker
virtual-worker : cmd/virtual-worker/virtual-worker

crossbinaries := $(addsuffix -linux-,$(binaries))
crossbinaries := $(crossbinaries:=386) $(crossbinaries:=amd64) $(crossbinaries:=arm64) $(crossbinaries:=armv6) $(crossbinaries:=armv7)
Expand All @@ -38,7 +39,7 @@ crossbinaries := $(crossbinaries:=386) $(crossbinaries:=amd64) $(crossbinaries:=
$(binaries) $(crossbinaries):
$(FLAGS) go build $(LDFLAGS) -o $@ ./$(@D)

.PHONY: tink-cli-image tink-controller-image tink-server-image tink-worker-image
.PHONY: tink-cli-image tink-controller-image tink-server-image tink-worker-image virtual-worker-image
tink-cli-image: cmd/tink-cli/tink-cli-linux-amd64
docker build -t tink-cli cmd/tink-cli/
tink-controller-image: cmd/tink-controller/tink-controller-linux-amd64
Expand All @@ -47,6 +48,8 @@ tink-server-image: cmd/tink-server/tink-server-linux-amd64
docker build -t tink-server cmd/tink-server/
tink-worker-image: cmd/tink-worker/tink-worker-linux-amd64
docker build -t tink-worker cmd/tink-worker/
virtual-worker-image: cmd/virtual-worker/virtual-worker-linux-amd64
docker build -t virtual-worker cmd/virtual-worker/

.PHONY: run-stack
run-stack:
Expand Down