Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Krm/tink server #573

Merged
merged 1 commit into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ queue_rules:
- check-success~=docker-images.*tink-cli
- check-success~=docker-images.*tink-server
- check-success~=docker-images.*tink-worker
- check-success=validation
micahhausler marked this conversation as resolved.
Show resolved Hide resolved

pull_request_rules:
- name: Automatic merge on approval
Expand All @@ -19,7 +18,7 @@ pull_request_rules:
- check-success~=docker-images.*tink-cli
- check-success~=docker-images.*tink-server
- check-success~=docker-images.*tink-worker
- check-success=validation
- check-success=crosscompile
- label!=do-not-merge
- label=ready-to-merge
actions:
Expand Down
14 changes: 4 additions & 10 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ jobs:
- name: Generate
run: nix-shell --run 'make generate'

- name: e2etest
run: make e2etest-setup

- name: go test
run: make test

Expand All @@ -63,21 +66,12 @@ jobs:
- run: make bin/gofumpt

- run: PATH=$PWD/bin/:$PATH ./ci-checks.sh
validation:
crosscompile:
runs-on: ubuntu-latest
needs:
- ci-checks

- test

- verify
steps:
- name: fake
run: echo ":+1:"
crosscompile:
runs-on: ubuntu-latest
needs:
- validation
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ out/

.*.swp
hack/tools

# test worker files
tests/worker
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ crosscompile: $(crossbinaries) ## Build all binaries for Linux and all supported
images: tink-cli-image tink-server-image tink-worker-image virtual-worker-image ## Build all docker images
run: crosscompile run-stack ## Builds and runs the Tink stack (tink, db, cli) via docker-compose

test: ## Run tests
go test -coverprofile=coverage.txt ./...
test: e2etest-setup ## Run tests
source <(setup-envtest use -p env) && go test -coverprofile=coverage.txt ./...

verify: lint check-generated # Verify code style, is lint free, freshness ...
gofumpt -s -d .
Expand Down
15 changes: 13 additions & 2 deletions cmd/tink-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/tinkerbell/tink/pkg/controllers"
wfctrl "github.com/tinkerbell/tink/pkg/controllers/workflow"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

// version is set at build time.
Expand Down Expand Up @@ -58,12 +59,22 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
logger.Info("starting controller version " + version)

config, err := clientcmd.BuildConfigFromFlags(config.K8sAPI, config.Kubeconfig)
ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.K8sAPI}})

cfg, err := ccfg.ClientConfig()
if err != nil {
return err
}

manager, err := controllers.NewManager(config, controllers.GetControllerOptions())
namespace, _, err := ccfg.Namespace()
if err != nil {
return err
}
options := controllers.GetControllerOptions()
options.LeaderElectionNamespace = namespace
manager, err := controllers.NewManager(cfg, options)
if err != nil {
return err
}
Expand Down
89 changes: 60 additions & 29 deletions cmd/tink-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,35 @@ type DaemonConfig struct {
CertDir string
HTTPAuthority string
TLS bool
Backend string

KubeconfigPath string
KubeAPI string
}

const (
backendPostgres = "postgres"
backendKubernetes = "kubernetes"
)

func backends() []string {
return []string{backendPostgres, backendKubernetes}
}

func (c *DaemonConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.Facility, "facility", "deprecated", "This is temporary. It will be removed")
fs.StringVar(&c.PGDatabase, "postgres-database", "tinkerbell", "The Postgres database name")
fs.StringVar(&c.PGUSer, "postgres-user", "tinkerbell", "The Postgres database username")
fs.StringVar(&c.PGPassword, "postgres-password", "tinkerbell", "The Postgres database password")
fs.StringVar(&c.PGSSLMode, "postgres-sslmode", "disable", "Enable or disable SSL mode in postgres")
fs.BoolVar(&c.OnlyMigration, "only-migration", false, "When enabled the server applies the migration to postgres database and it exits")
fs.StringVar(&c.PGDatabase, "postgres-database", "tinkerbell", "The Postgres database name. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.PGUSer, "postgres-user", "tinkerbell", "The Postgres database username. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.PGPassword, "postgres-password", "tinkerbell", "The Postgres database password. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.PGSSLMode, "postgres-sslmode", "disable", "Enable or disable SSL mode in postgres. Only takes effect if `--backend=postgres`")
fs.BoolVar(&c.OnlyMigration, "only-migration", false, "When enabled the server applies the migration to postgres database and it exits. Only takes effect if `--backend=postgres`")
fs.StringVar(&c.GRPCAuthority, "grpc-authority", ":42113", "The address used to expose the gRPC server")
fs.StringVar(&c.CertDir, "cert-dir", "", "")
fs.StringVar(&c.HTTPAuthority, "http-authority", ":42114", "The address used to expose the HTTP server")
fs.BoolVar(&c.TLS, "tls", true, "Run in tls protected mode (disabling should only be done for development or if behind TLS terminating proxy)")
fs.StringVar(&c.Backend, "backend", backendPostgres, fmt.Sprintf("The backend datastore to use. Must be one of %s", strings.Join(backends(), ", ")))
fs.StringVar(&c.KubeconfigPath, "kubeconfig", "", "The path to the Kubeconfig. Only takes effect if `--backend=kubernetes`")
fs.StringVar(&c.KubeAPI, "kube-api", "", "The Kubernetes API endpoint. Only takes effect if `--backend=kubernetes`")
}

func (c *DaemonConfig) PopulateFromLegacyEnvVar() {
Expand Down Expand Up @@ -119,26 +135,11 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
// graceful shutdown and error management but I want to
// figure this out in another PR
errCh := make(chan error, 2)

// TODO(gianarb): I moved this up because we need to be sure that both
// connection, the one used for the resources and the one used for
// listening to events and notification are coming in the same way.
// BUT we should be using the right flags
connInfo := fmt.Sprintf("dbname=%s user=%s password=%s sslmode=%s",
config.PGDatabase,
config.PGUSer,
config.PGPassword,
config.PGSSLMode,
var (
registrar grpcserver.Registrar
grpcOpts []grpc.ServerOption
err error
)
database, err := internal.SetupPostgres(connInfo, config.OnlyMigration, logger)
if err != nil {
return err
}
if config.OnlyMigration {
return nil
}

var grpcOpts []grpc.ServerOption
if config.TLS {
certDir := config.CertDir
if certDir == "" {
Expand All @@ -150,16 +151,46 @@ func NewRootCommand(config *DaemonConfig, logger log.Logger) *cobra.Command {
}
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewServerTLSFromCert(cert)))
}
switch config.Backend {
case backendKubernetes:
var err error
registrar, err = server.NewKubeBackedServer(logger, config.KubeconfigPath, config.KubeAPI)
if err != nil {
return err
}
case backendPostgres:
// TODO(gianarb): I moved this up because we need to be sure that both
// connection, the one used for the resources and the one used for
// listening to events and notification are coming in the same way.
// BUT we should be using the right flags
connInfo := fmt.Sprintf("dbname=%s user=%s password=%s sslmode=%s",
config.PGDatabase,
config.PGUSer,
config.PGPassword,
config.PGSSLMode,
)
database, err := internal.SetupPostgres(connInfo, config.OnlyMigration, logger)
if err != nil {
return err
}
if config.OnlyMigration {
return nil
}

tinkAPI, err := server.NewDBServer(logger, database)
if err != nil {
return err
registrar, err = server.NewDBServer(
logger,
database,
)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid backend: %s", config.Backend)
}
micahhausler marked this conversation as resolved.
Show resolved Hide resolved

// Start the gRPC server in the background
addr, err := grpcserver.SetupGRPC(
micahhausler marked this conversation as resolved.
Show resolved Hide resolved
ctx,
tinkAPI,
registrar,
config.GRPCAuthority,
grpcOpts,
errCh)
micahhausler marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
21 changes: 6 additions & 15 deletions cmd/tink-worker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,8 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
return st, errors.Wrap(err, "wait container")
}

l.With("status", st.String()).Info("container removed")

if st == pb.State_STATE_SUCCESS {
l.With("status", st).Info("action container exited with success", st)
l.With("status", st).Info("action container exited with success")
return st, nil
}

Expand Down Expand Up @@ -299,15 +297,6 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()]
actionIndex = int(wfContext.GetCurrentActionIndex())
}
l := l.With(
"currentWorker", wfContext.GetCurrentWorker(),
"currentTask", wfContext.GetCurrentTask(),
"currentAction", wfContext.GetCurrentAction(),
"currentActionIndex", strconv.FormatInt(wfContext.GetCurrentActionIndex(), 10),
"currentActionState", wfContext.GetCurrentActionState(),
"totalNumberOfActions", wfContext.GetTotalNumberOfActions(),
)
l.Info("current context")
if nextAction.GetWorkerId() == w.workerID {
turn = true
}
Expand Down Expand Up @@ -338,12 +327,13 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
os.Exit(1)
}
}
l.Info("starting with action")
l.Info("starting action")
}

for turn {
action := actions.GetActionList()[actionIndex]
l := l.With("actionName", action.GetName(),
l := l.With(
"actionName", action.GetName(),
"taskName", action.GetTaskName(),
)
ctx := context.WithValue(ctx, loggingContextKey, &l)
Expand All @@ -361,7 +351,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context) error {
if err != nil {
exitWithGrpcError(err, l)
}
l.With("duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status")
l.With("status", actionStatus.ActionStatus, "duration", strconv.FormatInt(actionStatus.Seconds, 10)).Info("sent action status")
}

// get workflow data
Expand Down Expand Up @@ -445,6 +435,7 @@ func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.Workfl
"workerID", actionStatus.GetWorkerId(),
"actionName", actionStatus.GetActionName(),
"taskName", actionStatus.GetTaskName(),
"status", actionStatus.ActionStatus,
)
var err error
for r := 1; r <= w.retries; r++ {
Expand Down
1 change: 1 addition & 0 deletions cmd/virtual-worker/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command {
logger,
tinkWorker.WithMaxFileSize(maxFileSize),
tinkWorker.WithRetries(retryInterval, retries),
tinkWorker.WithDataDir("./worker"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the virtual-worker has limited use-cases, but perhaps --data-dir could be added with ./worker as the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

tinkWorker.WithLogCapture(captureActionLogs))

err = w.ProcessWorkflowActions(cmd.Context())
Expand Down
6 changes: 6 additions & 0 deletions cmd/virtual-worker/worker/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func (m *fakeManager) sleep() {

// NewFakeContainerManager returns a fake worker.ContainerManager that will sleep for Docker API calls.
func NewFakeContainerManager(l log.Logger, sleepMinimum, sleepJitter time.Duration) worker.ContainerManager {
if sleepMinimum <= 0 {
sleepMinimum = 1
}
if sleepJitter <= 0 {
sleepJitter = 1
}
return &fakeManager{
sleepMinimum: sleepMinimum,
sleepJitter: sleepJitter,
Expand Down
23 changes: 15 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ require (
github.com/docker/distribution v2.8.1+incompatible
github.com/docker/docker v20.10.7+incompatible
github.com/equinix-labs/otel-init-go v0.0.1
github.com/go-logr/zapr v0.4.0
github.com/go-openapi/strfmt v0.19.3 // indirect
github.com/go-logr/zapr v1.2.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.3.0
Expand All @@ -18,7 +17,8 @@ require (
github.com/ktr0731/evans v0.10.0
github.com/lib/pq v1.10.1
github.com/matryer/moq v0.2.3
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/onsi/ginkgo/v2 v2.1.3
github.com/onsi/gomega v1.18.1
github.com/opencontainers/image-spec v1.0.2
github.com/packethost/pkg v0.0.0-20200903155310-0433e0605550
github.com/pkg/errors v0.9.1
Expand All @@ -29,20 +29,27 @@ require (
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.7.0
github.com/testcontainers/testcontainers-go v0.11.1
go.mongodb.org/mongo-driver v1.1.2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.22.0
go.uber.org/multierr v1.7.0
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c
google.golang.org/grpc v1.42.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.22.2
k8s.io/client-go v0.22.2
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
knative.dev/pkg v0.0.0-20211119170723-a99300deff34
mvdan.cc/gofumpt v0.1.1
sigs.k8s.io/controller-runtime v0.10.1
sigs.k8s.io/controller-tools v0.7.0
sigs.k8s.io/controller-runtime v0.11.1
sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20220304125252-9ee63fc65a97
sigs.k8s.io/controller-tools v0.8.0
sigs.k8s.io/yaml v1.3.0
)

require (
github.com/go-openapi/strfmt v0.19.3 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
go.mongodb.org/mongo-driver v1.1.2 // indirect
)

replace github.com/stormcat24/protodep => github.com/ackintosh/protodep v0.0.0-20200728152107-abf8eb579d6c
Expand Down
Loading