Skip to content

Commit

Permalink
Krm/tink server (#573)
Browse files Browse the repository at this point in the history
## Description

This PR adds a Kubernetes API backend to Tinkerell as a replacement for Postgres.

This PR builds on #563 and adds new API capabilities

## Why is this needed

## How Has This Been Tested?

## How are existing users impacted? What migration steps/scripts do we need?

## Checklist:

I have:

- [ ] updated the documentation and/or roadmap (if required)
- [ ] added unit or e2e tests
- [ ] provided instructions on how to upgrade
  • Loading branch information
mmlb authored Apr 26, 2022
2 parents a80e0e1 + cc49e78 commit 59b0126
Show file tree
Hide file tree
Showing 33 changed files with 1,707 additions and 149 deletions.
3 changes: 1 addition & 2 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ queue_rules:
- check-success~=docker-images.*tink-cli
- check-success~=docker-images.*tink-server
- check-success~=docker-images.*tink-worker
- check-success=validation

pull_request_rules:
- name: Automatic merge on approval
Expand All @@ -19,7 +18,7 @@ pull_request_rules:
- check-success~=docker-images.*tink-cli
- check-success~=docker-images.*tink-server
- check-success~=docker-images.*tink-worker
- check-success=validation
- check-success=crosscompile
- label!=do-not-merge
- label=ready-to-merge
actions:
Expand Down
14 changes: 4 additions & 10 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ jobs:
- name: Generate
run: nix-shell --run 'make generate'

- name: e2etest
run: make e2etest-setup

- name: go test
run: make test

Expand All @@ -63,21 +66,12 @@ jobs:
- run: make bin/gofumpt

- run: PATH=$PWD/bin/:$PATH ./ci-checks.sh
validation:
crosscompile:
runs-on: ubuntu-latest
needs:
- ci-checks

- test

- verify
steps:
- name: fake
run: echo ":+1:"
crosscompile:
runs-on: ubuntu-latest
needs:
- validation
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ out/

.*.swp
hack/tools

# test worker files
tests/worker
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ crosscompile: $(crossbinaries) ## Build all binaries for Linux and all supported
images: tink-cli-image tink-server-image tink-worker-image virtual-worker-image ## Build all docker images
run: crosscompile run-stack ## Builds and runs the Tink stack (tink, db, cli) via docker-compose

test: ## Run tests
go test -coverprofile=coverage.txt ./...
test: e2etest-setup ## Run tests
source <(setup-envtest use -p env) && go test -coverprofile=coverage.txt ./...

verify: lint check-generated # Verify code style, is lint free, freshness ...
gofumpt -s -d .
Expand Down
15 changes: 13 additions & 2 deletions cmd/tink-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/tinkerbell/tink/pkg/controllers"
wfctrl "github.com/tinkerbell/tink/pkg/controllers/workflow"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

// version is set at build time.
Expand Down Expand Up @@ -58,12 +59,22 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
logger.Info("starting controller version " + version)

config, err := clientcmd.BuildConfigFromFlags(config.K8sAPI, config.Kubeconfig)
ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.K8sAPI}})

cfg, err := ccfg.ClientConfig()
if err != nil {
return err
}

manager, err := controllers.NewManager(config, controllers.GetControllerOptions())
namespace, _, err := ccfg.Namespace()
if err != nil {
return err
}
options := controllers.GetControllerOptions()
options.LeaderElectionNamespace = namespace
manager, err := controllers.NewManager(cfg, options)
if err != nil {
return err
}
Expand Down
89 changes: 60 additions & 29 deletions cmd/tink-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,35 @@ type DaemonConfig struct {
CertDir string
HTTPAuthority string
TLS bool
Backend string

KubeconfigPath string
KubeAPI string
}

const (
backendPostgres = "postgres"
backendKubernetes = "kubernetes"
)

func backends() []string {
return []string{backendPostgres, backendKubernetes}
}

func (c *DaemonConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.Facility, "facility", "deprecated", "This is temporary. It will be removed")
fs.StringVar(&c.PGDatabase, "postgres-database", "tinkerbell", "The Postgres database name")
fs.StringVar(&c.PGUSer, "postgres-user", "tinkerbell", "The Postgres database username")
fs.StringVar(&c.PGPassword, "postgres-password", "tinkerbell", "The Postgres database password")
fs.StringVar(&c.PGSSLMode, "postgres-sslmode", "disable", "Enable or disable SSL mode in postgres")
fs.BoolVar(&c.OnlyMigration, "only-migration", false, "When enabled the server applies the migration to postgres database and it exits")
fs.StringVar(&c.PGDatabase, "postgres-database", "tinkerbell", "The Postgres database name. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.PGUSer, "postgres-user", "tinkerbell", "The Postgres database username. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.PGPassword, "postgres-password", "tinkerbell", "The Postgres database password. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.PGSSLMode, "postgres-sslmode", "disable", "Enable or disable SSL mode in postgres. Only takes effect if `--backend=postgres`")
fs.BoolVar(&c.OnlyMigration, "only-migration", false, "When enabled the server applies the migration to postgres database and it exits. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.GRPCAuthority, "grpc-authority", ":42113", "The address used to expose the gRPC server")
fs.StringVar(&c.CertDir, "cert-dir", "", "")
fs.StringVar(&c.HTTPAuthority, "http-authority", ":42114", "The address used to expose the HTTP server")
fs.BoolVar(&c.TLS, "tls", true, "Run in tls protected mode (disabling should only be done for development or if behind TLS terminating proxy)")
fs.StringVar(&c.Backend, "backend", backendPostgres, fmt.Sprintf("The backend datastore to use. Must be one of %s", strings.Join(backends(), ", ")))
fs.StringVar(&c.KubeconfigPath, "kubeconfig", "", "The path to the Kubeconfig. Only takes effect if `--backend=kubernetes`")
fs.StringVar(&c.KubeAPI, "kube-api", "", "The Kubernetes API endpoint. Only takes effect if `--backend=kubernetes`")
}

func (c *DaemonConfig) PopulateFromLegacyEnvVar() {
Expand Down Expand Up @@ -119,26 +135,11 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
// graceful shutdown and error management but I want to
// figure this out in another PR
errCh := make(chan error, 2)

// TODO(gianarb): I moved this up because we need to be sure that both
// connection, the one used for the resources and the one used for
// listening to events and notification are coming in the same way.
// BUT we should be using the right flags
connInfo := fmt.Sprintf("dbname=%s user=%s password=%s sslmode=%s",
config.PGDatabase,
config.PGUSer,
config.PGPassword,
config.PGSSLMode,
var (
registrar grpcserver.Registrar
grpcOpts []grpc.ServerOption
err error
)
database, err := internal.SetupPostgres(connInfo, config.OnlyMigration, logger)
if err != nil {
return err
}
if config.OnlyMigration {
return nil
}

var grpcOpts []grpc.ServerOption
if config.TLS {
certDir := config.CertDir
if certDir == "" {
Expand All @@ -150,16 +151,46 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
}
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewServerTLSFromCert(cert)))
}
switch config.Backend {
case backendKubernetes:
var err error
registrar, err = server.NewKubeBackedServer(logger, config.KubeconfigPath, config.KubeAPI)
if err != nil {
return err
}
case backendPostgres:
// TODO(gianarb): I moved this up because we need to be sure that both
// connection, the one used for the resources and the one used for
// listening to events and notification are coming in the same way.
// BUT we should be using the right flags
connInfo := fmt.Sprintf("dbname=%s user=%s password=%s sslmode=%s",
config.PGDatabase,
config.PGUSer,
config.PGPassword,
config.PGSSLMode,
)
database, err := internal.SetupPostgres(connInfo, config.OnlyMigration, logger)
if err != nil {
return err
}
if config.OnlyMigration {
return nil
}

tinkAPI, err := server.NewDBServer(logger, database)
if err != nil {
return err
registrar, err = server.NewDBServer(
logger,
database,
)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid backend: %s", config.Backend)
}

// Start the gRPC server in the background
addr, err := grpcserver.SetupGRPC(
ctx,
tinkAPI,
registrar,
config.GRPCAuthority,
grpcOpts,
errCh)
Expand Down
21 changes: 6 additions & 15 deletions cmd/tink-worker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,8 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
return st, errors.Wrap(err, "wait container")
}

l.With("status", st.String()).Info("container removed")

if st == pb.State_STATE_SUCCESS {
l.With("status", st).Info("action container exited with success", st)
l.With("status", st).Info("action container exited with success")
return st, nil
}

Expand Down Expand Up @@ -299,15 +297,6 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()]
actionIndex = int(wfContext.GetCurrentActionIndex())
}
l := l.With(
"currentWorker", wfContext.GetCurrentWorker(),
"currentTask", wfContext.GetCurrentTask(),
"currentAction", wfContext.GetCurrentAction(),
"currentActionIndex", strconv.FormatInt(wfContext.GetCurrentActionIndex(), 10),
"currentActionState", wfContext.GetCurrentActionState(),
"totalNumberOfActions", wfContext.GetTotalNumberOfActions(),
)
l.Info("current context")
if nextAction.GetWorkerId() == w.workerID {
turn = true
}
Expand Down Expand Up @@ -338,12 +327,13 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
os.Exit(1)
}
}
l.Info("starting with action")
l.Info("starting action")
}

for turn {
action := actions.GetActionList()[actionIndex]
l := l.With("actionName", action.GetName(),
l := l.With(
"actionName", action.GetName(),
"taskName", action.GetTaskName(),
)
ctx := context.WithValue(ctx, loggingContextKey, &l)
Expand All @@ -361,7 +351,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
if err != nil {
exitWithGrpcError(err, l)
}
l.With("duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status")
l.With("status", actionStatus.ActionStatus, "duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status")
}

// get workflow data
Expand Down Expand Up @@ -445,6 +435,7 @@ func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.Workfl
"workerID", actionStatus.GetWorkerId(),
"actionName", actionStatus.GetActionName(),
"taskName", actionStatus.GetTaskName(),
"status", actionStatus.ActionStatus,
)
var err error
for r := 1; r <= w.retries; r++ {
Expand Down
1 change: 1 addition & 0 deletions cmd/virtual-worker/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command {
logger,
tinkWorker.WithMaxFileSize(maxFileSize),
tinkWorker.WithRetries(retryInterval, retries),
tinkWorker.WithDataDir("./worker"),
tinkWorker.WithLogCapture(captureActionLogs))

err = w.ProcessWorkflowActions(cmd.Context())
Expand Down
6 changes: 6 additions & 0 deletions cmd/virtual-worker/worker/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func (m *fakeManager) sleep() {

// NewFakeContainerManager returns a fake worker.ContainerManager that will sleep for Docker API calls.
func NewFakeContainerManager(l log.Logger, sleepMinimum, sleepJitter time.Duration) worker.ContainerManager {
if sleepMinimum <= 0 {
sleepMinimum = 1
}
if sleepJitter <= 0 {
sleepJitter = 1
}
return &fakeManager{
sleepMinimum: sleepMinimum,
sleepJitter: sleepJitter,
Expand Down
23 changes: 15 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ require (
github.com/docker/distribution v2.8.1+incompatible
github.com/docker/docker v20.10.7+incompatible
github.com/equinix-labs/otel-init-go v0.0.1
github.com/go-logr/zapr v0.4.0
github.com/go-openapi/strfmt v0.19.3 // indirect
github.com/go-logr/zapr v1.2.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.3.0
Expand All @@ -18,7 +17,8 @@ require (
github.com/ktr0731/evans v0.10.0
github.com/lib/pq v1.10.1
github.com/matryer/moq v0.2.3
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/onsi/ginkgo/v2 v2.1.3
github.com/onsi/gomega v1.18.1
github.com/opencontainers/image-spec v1.0.2
github.com/packethost/pkg v0.0.0-20200903155310-0433e0605550
github.com/pkg/errors v0.9.1
Expand All @@ -29,20 +29,27 @@ require (
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.7.0
github.com/testcontainers/testcontainers-go v0.11.1
go.mongodb.org/mongo-driver v1.1.2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.22.0
go.uber.org/multierr v1.7.0
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c
google.golang.org/grpc v1.42.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.22.2
k8s.io/client-go v0.22.2
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
knative.dev/pkg v0.0.0-20211119170723-a99300deff34
mvdan.cc/gofumpt v0.1.1
sigs.k8s.io/controller-runtime v0.10.1
sigs.k8s.io/controller-tools v0.7.0
sigs.k8s.io/controller-runtime v0.11.1
sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20220304125252-9ee63fc65a97
sigs.k8s.io/controller-tools v0.8.0
sigs.k8s.io/yaml v1.3.0
)

require (
github.com/go-openapi/strfmt v0.19.3 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
go.mongodb.org/mongo-driver v1.1.2 // indirect
)

replace github.com/stormcat24/protodep => github.com/ackintosh/protodep v0.0.0-20200728152107-abf8eb579d6c
Expand Down
Loading

0 comments on commit 59b0126

Please sign in to comment.