Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance(auth): implement registration flow #452

Merged
merged 15 commits into from
Apr 4, 2023
Merged
70 changes: 70 additions & 0 deletions api/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package api

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/go-vela/worker/router/middleware/token"
)

// swagger:operation POST /register system Register
//
// Fill registration token channel in worker to continue operation
//
// ---
// produces:
// - application/json
// parameters:
// security:
// - ApiKeyAuth: []
// responses:
// '200':
// description: Successfully passed token to worker
// schema:
// type: string
// '500':
// description: Unable to pass token to worker
// schema:
// "$ref": "#/definitions/Error"

// Register will pass the token given in the request header to the register token
// channel of the worker. This will unblock operation if the worker has not been
// registered and the provided registration token is valid.
func Register(c *gin.Context) {
// extract the register token channel that was packed into gin context
v, ok := c.Get("register-token")
if !ok {
c.JSON(http.StatusInternalServerError, "no register token channel in the context")
return
}

// make sure we configured the channel properly
rChan, ok := v.(chan string)
if !ok {
c.JSON(http.StatusInternalServerError, "register token channel in the context is the wrong type")
return
}

// if token is present in the channel, deny registration
// this will likely never happen as the channel is offloaded immediately
if len(rChan) > 0 {
c.JSON(http.StatusOK, "worker already registered")
return
}

// retrieve auth token from header
token, err := token.Retrieve(c.Request)
if err != nil {
c.JSON(http.StatusInternalServerError, err)
return
}

// write registration token to auth token channel
rChan <- token

c.JSON(http.StatusOK, "successfully passed token to worker")
}
1 change: 1 addition & 0 deletions cmd/vela-worker/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func flags() []cli.Flag {
EnvVars: []string{"WORKER_SERVER_SECRET", "VELA_SERVER_SECRET", "SERVER_SECRET"},
Name: "server.secret",
Usage: "secret used for server <-> worker communication",
Value: "",
},
&cli.StringFlag{
EnvVars: []string{"WORKER_SERVER_CERT", "VELA_SERVER_CERT", "SERVER_CERT"},
Expand Down
72 changes: 56 additions & 16 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
func (w *Worker) operate(ctx context.Context) error {
var err error

// setup the vela client with the server
w.VelaClient, err = setupClient(w.Config.Server, w.Config.Server.Secret)
if err != nil {
return err
}

// create the errgroup for managing operator subprocesses
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group
Expand All @@ -40,9 +34,17 @@ func (w *Worker) operate(ctx context.Context) error {
registryWorker.SetAddress(w.Config.API.Address.String())
registryWorker.SetRoutes(w.Config.Queue.Routes)
registryWorker.SetActive(true)
registryWorker.SetLastCheckedIn(time.Now().UTC().Unix())
registryWorker.SetBuildLimit(int64(w.Config.Build.Limit))

// pull registration token from configuration if provided; wait if not
token := <-w.RegisterToken

// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
return err
}

// spawn goroutine for phoning home
executors.Go(func() error {
for {
Expand All @@ -51,19 +53,51 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Info("Completed looping on worker registration")
return nil
default:
// set checking time to now and call the server
registryWorker.SetLastCheckedIn(time.Now().UTC().Unix())
// check in attempt loop
for {
// register or update the worker
//nolint:contextcheck // ignore passing context
w.CheckedIn, token, err = w.checkIn(registryWorker)
// check in failed
if err != nil {
// check if token is expired
expired, err := w.VelaClient.Authentication.IsTokenAuthExpired()
if err != nil {
logrus.Error("unable to check token expiration")
return err
}

// token has expired
if expired && len(w.Config.Server.Secret) == 0 {
// wait on new registration token, return to check in attempt
token = <-w.RegisterToken

// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
return err
}

continue
}

// check in failed, token is still valid, retry
logrus.Errorf("unable to check-in worker %s on the server: %v", registryWorker.GetHostname(), err)
logrus.Info("retrying...")

time.Sleep(5 * time.Second)

continue
}

// register or update the worker
//nolint:contextcheck // ignore passing context
err = w.checkIn(registryWorker)
if err != nil {
logrus.Error(err)
// successful check in breaks the loop
break
}

// if unable to update the worker, log the error but allow the worker to continue running
// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
logrus.Errorf("unable to update worker %s on the server: %v", registryWorker.GetHostname(), err)
return err
}

// sleep for the configured time
Expand Down Expand Up @@ -99,6 +133,12 @@ func (w *Worker) operate(ctx context.Context) error {
executors.Go(func() error {
// create an infinite loop to poll for builds
for {
// do not pull from queue unless worker is checked in with server
if !w.CheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("worker not checked in, skipping queue read")
continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
Expand Down
20 changes: 10 additions & 10 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,45 @@ import (
)

// checkIn is a helper function to phone home to the server.
func (w *Worker) checkIn(config *library.Worker) error {
func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
// check to see if the worker already exists in the database
logrus.Infof("retrieving worker %s from the server", config.GetHostname())

_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
if resp == nil {
return respErr
return false, "", respErr
}
// if we receive a 404 the worker needs to be registered
if resp.StatusCode == http.StatusNotFound {
return w.register(config)
}

return respErr
return false, "", respErr
}

// if we were able to GET the worker, update it
logrus.Infof("checking worker %s into the server", config.GetHostname())

_, _, err = w.VelaClient.Worker.Update(config.GetHostname(), config)
tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
return fmt.Errorf("unable to update worker %s on the server: %w", config.GetHostname(), err)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}

return nil
return true, tkn.GetToken(), nil
}

// register is a helper function to register the worker with the server.
func (w *Worker) register(config *library.Worker) error {
func (w *Worker) register(config *library.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

_, _, err := w.VelaClient.Worker.Add(config)
tkn, _, err := w.VelaClient.Worker.Add(config)
if err != nil {
// log the error instead of returning so the operation doesn't block worker deployment
return fmt.Errorf("unable to register worker %s with the server: %w", config.GetHostname(), err)
return false, "", fmt.Errorf("unable to register worker %s with the server: %w", config.GetHostname(), err)
}

// successfully added the worker so return nil
return nil
return true, tkn.GetToken(), nil
}
7 changes: 7 additions & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,20 @@ func run(c *cli.Context) error {
TLSMinVersion: c.String("server.tls-min-version"),
},
Executors: make(map[int]executor.Engine),

RegisterToken: make(chan string, 1),
}

// set the worker address if no flag was provided
if len(w.Config.API.Address.String()) == 0 {
w.Config.API.Address, _ = url.Parse(fmt.Sprintf("http://%s", hostname))
}

// if server secret is provided, use as register token on start up
if len(c.String("server.secret")) > 0 {
w.RegisterToken <- c.String("server.secret")
}

// validate the worker
err = w.Validate()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (w *Worker) server() (http.Handler, *tls.Config) {
middleware.ServerAddress(w.Config.Server.Address),
middleware.Executors(w.Executors),
middleware.Logger(logrus.StandardLogger(), time.RFC3339, true),
middleware.RegisterToken(w.RegisterToken),
)

// log a message indicating the start of serving traffic
Expand Down
5 changes: 0 additions & 5 deletions cmd/vela-worker/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func (w *Worker) Validate() error {
return fmt.Errorf("no worker server address provided")
}

// verify a server secret was provided
if len(w.Config.Server.Secret) == 0 {
return fmt.Errorf("no worker server secret provided")
}

// verify an executor driver was provided
if len(w.Config.Executor.Driver) == 0 {
return fmt.Errorf("no worker executor driver provided")
Expand Down
12 changes: 7 additions & 5 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type (
// Worker represents all configuration and
// system processes for the worker.
Worker struct {
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
}
)
10 changes: 7 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ services:
VELA_RUNTIME_PRIVILEGED_IMAGES: 'target/vela-docker'
VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS: 'true'
VELA_SERVER_ADDR: 'http://server:8080'
# comment the line below to use registration flow
VELA_SERVER_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
WORKER_ADDR: 'http://worker:8080'
WORKER_CHECK_IN: 5m
WORKER_CHECK_IN: 2m
restart: always
ports:
- "8081:8080"
Expand Down Expand Up @@ -68,9 +69,12 @@ services:
VELA_ADDR: 'http://localhost:8080'
VELA_WEBUI_ADDR: 'http://localhost:8888'
VELA_LOG_LEVEL: trace
# comment the line below to use registration flow
VELA_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
VELA_REFRESH_TOKEN_DURATION: 90m
VELA_ACCESS_TOKEN_DURATION: 60m
VELA_SERVER_PRIVATE_KEY: 'F534FF2A080E45F38E05DC70752E6787'
VELA_USER_REFRESH_TOKEN_DURATION: 90m
VELA_USER_ACCESS_TOKEN_DURATION: 60m
VELA_WORKER_AUTH_TOKEN_DURATION: 3m
VELA_DISABLE_WEBHOOK_VALIDATION: 'true'
VELA_ENABLE_SECURE_COOKIE: 'false'
VELA_REPO_ALLOWLIST: '*'
Expand Down
18 changes: 18 additions & 0 deletions router/middleware/register_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package middleware

import (
"github.com/gin-gonic/gin"
)

// RegisterToken is a middleware function that attaches the
// auth-token channel to the context of every http.Request.
func RegisterToken(r chan string) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("register-token", r)
c.Next()
}
}
48 changes: 48 additions & 0 deletions router/middleware/register_token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package middleware

import (
"net/http"
"net/http/httptest"
"reflect"
"testing"

"github.com/gin-gonic/gin"
)

func TestMiddleware_RegisterToken(t *testing.T) {
// setup types
want := make(chan string, 1)
got := make(chan string, 1)

want <- "foo"

// setup context
gin.SetMode(gin.TestMode)

resp := httptest.NewRecorder()
context, engine := gin.CreateTestContext(resp)
context.Request, _ = http.NewRequest(http.MethodGet, "/health", nil)

// setup mock server
engine.Use(RegisterToken(want))
engine.GET("/health", func(c *gin.Context) {
got = c.Value("register-token").(chan string)

c.Status(http.StatusOK)
})

// run test
engine.ServeHTTP(context.Writer, context.Request)

if resp.Code != http.StatusOK {
t.Errorf("RegisterToken returned %v, want %v", resp.Code, http.StatusOK)
}

if !reflect.DeepEqual(got, want) {
t.Errorf("RegisterToken is %v, want foo", got)
}
}
Loading