Skip to content

Commit

Permalink
enhance(auth): implement registration flow (#452)
Browse files Browse the repository at this point in the history
* initial work

* more work

* backwards compatibility with comments

* adjust middleware to only use validate-token for server tokens

* more work

* rename auth token channel to RegisterToken

* rename token channel and update api comments

* updating some comments and not loggin token

* fix docker compose

* fix local replace

* token expiration func has two return vals

* docker compose no register

* name register token middleware file correctly

* update swagger for register
  • Loading branch information
ecrupper authored Apr 4, 2023
1 parent 22ae0a1 commit 6c146e5
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 39 deletions.
70 changes: 70 additions & 0 deletions api/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package api

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/go-vela/worker/router/middleware/token"
)

// swagger:operation POST /register system Register
//
// Fill registration token channel in worker to continue operation
//
// ---
// produces:
// - application/json
// parameters:
// security:
// - ApiKeyAuth: []
// responses:
// '200':
// description: Successfully passed token to worker
// schema:
// type: string
// '500':
// description: Unable to pass token to worker
// schema:
// "$ref": "#/definitions/Error"

// Register will pass the token given in the request header to the register token
// channel of the worker. This will unblock operation if the worker has not been
// registered and the provided registration token is valid.
func Register(c *gin.Context) {
// extract the register token channel that was packed into gin context
v, ok := c.Get("register-token")
if !ok {
c.JSON(http.StatusInternalServerError, "no register token channel in the context")
return
}

// make sure we configured the channel properly
rChan, ok := v.(chan string)
if !ok {
c.JSON(http.StatusInternalServerError, "register token channel in the context is the wrong type")
return
}

// if token is present in the channel, deny registration
// this will likely never happen as the channel is offloaded immediately
if len(rChan) > 0 {
c.JSON(http.StatusOK, "worker already registered")
return
}

// retrieve auth token from header
token, err := token.Retrieve(c.Request)
if err != nil {
c.JSON(http.StatusInternalServerError, err)
return
}

// write registration token to auth token channel
rChan <- token

c.JSON(http.StatusOK, "successfully passed token to worker")
}
1 change: 1 addition & 0 deletions cmd/vela-worker/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func flags() []cli.Flag {
EnvVars: []string{"WORKER_SERVER_SECRET", "VELA_SERVER_SECRET", "SERVER_SECRET"},
Name: "server.secret",
Usage: "secret used for server <-> worker communication",
Value: "",
},
&cli.StringFlag{
EnvVars: []string{"WORKER_SERVER_CERT", "VELA_SERVER_CERT", "SERVER_CERT"},
Expand Down
72 changes: 56 additions & 16 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
func (w *Worker) operate(ctx context.Context) error {
var err error

// setup the vela client with the server
w.VelaClient, err = setupClient(w.Config.Server, w.Config.Server.Secret)
if err != nil {
return err
}

// create the errgroup for managing operator subprocesses
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group
Expand All @@ -40,9 +34,17 @@ func (w *Worker) operate(ctx context.Context) error {
registryWorker.SetAddress(w.Config.API.Address.String())
registryWorker.SetRoutes(w.Config.Queue.Routes)
registryWorker.SetActive(true)
registryWorker.SetLastCheckedIn(time.Now().UTC().Unix())
registryWorker.SetBuildLimit(int64(w.Config.Build.Limit))

// pull registration token from configuration if provided; wait if not
token := <-w.RegisterToken

// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
return err
}

// spawn goroutine for phoning home
executors.Go(func() error {
for {
Expand All @@ -51,19 +53,51 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Info("Completed looping on worker registration")
return nil
default:
// set checking time to now and call the server
registryWorker.SetLastCheckedIn(time.Now().UTC().Unix())
// check in attempt loop
for {
// register or update the worker
//nolint:contextcheck // ignore passing context
w.CheckedIn, token, err = w.checkIn(registryWorker)
// check in failed
if err != nil {
// check if token is expired
expired, err := w.VelaClient.Authentication.IsTokenAuthExpired()
if err != nil {
logrus.Error("unable to check token expiration")
return err
}

// token has expired
if expired && len(w.Config.Server.Secret) == 0 {
// wait on new registration token, return to check in attempt
token = <-w.RegisterToken

// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
return err
}

continue
}

// check in failed, token is still valid, retry
logrus.Errorf("unable to check-in worker %s on the server: %v", registryWorker.GetHostname(), err)
logrus.Info("retrying...")

time.Sleep(5 * time.Second)

continue
}

// register or update the worker
//nolint:contextcheck // ignore passing context
err = w.checkIn(registryWorker)
if err != nil {
logrus.Error(err)
// successful check in breaks the loop
break
}

// if unable to update the worker, log the error but allow the worker to continue running
// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
logrus.Errorf("unable to update worker %s on the server: %v", registryWorker.GetHostname(), err)
return err
}

// sleep for the configured time
Expand Down Expand Up @@ -99,6 +133,12 @@ func (w *Worker) operate(ctx context.Context) error {
executors.Go(func() error {
// create an infinite loop to poll for builds
for {
// do not pull from queue unless worker is checked in with server
if !w.CheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("worker not checked in, skipping queue read")
continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
Expand Down
20 changes: 10 additions & 10 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,45 @@ import (
)

// checkIn is a helper function to phone home to the server.
func (w *Worker) checkIn(config *library.Worker) error {
func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
// check to see if the worker already exists in the database
logrus.Infof("retrieving worker %s from the server", config.GetHostname())

_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
if resp == nil {
return respErr
return false, "", respErr
}
// if we receive a 404 the worker needs to be registered
if resp.StatusCode == http.StatusNotFound {
return w.register(config)
}

return respErr
return false, "", respErr
}

// if we were able to GET the worker, update it
logrus.Infof("checking worker %s into the server", config.GetHostname())

_, _, err = w.VelaClient.Worker.Update(config.GetHostname(), config)
tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
return fmt.Errorf("unable to update worker %s on the server: %w", config.GetHostname(), err)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}

return nil
return true, tkn.GetToken(), nil
}

// register is a helper function to register the worker with the server.
func (w *Worker) register(config *library.Worker) error {
func (w *Worker) register(config *library.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

_, _, err := w.VelaClient.Worker.Add(config)
tkn, _, err := w.VelaClient.Worker.Add(config)
if err != nil {
// log the error instead of returning so the operation doesn't block worker deployment
return fmt.Errorf("unable to register worker %s with the server: %w", config.GetHostname(), err)
return false, "", fmt.Errorf("unable to register worker %s with the server: %w", config.GetHostname(), err)
}

// successfully added the worker so return nil
return nil
return true, tkn.GetToken(), nil
}
7 changes: 7 additions & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,20 @@ func run(c *cli.Context) error {
TLSMinVersion: c.String("server.tls-min-version"),
},
Executors: make(map[int]executor.Engine),

RegisterToken: make(chan string, 1),
}

// set the worker address if no flag was provided
if len(w.Config.API.Address.String()) == 0 {
w.Config.API.Address, _ = url.Parse(fmt.Sprintf("http://%s", hostname))
}

// if server secret is provided, use as register token on start up
if len(c.String("server.secret")) > 0 {
w.RegisterToken <- c.String("server.secret")
}

// validate the worker
err = w.Validate()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (w *Worker) server() (http.Handler, *tls.Config) {
middleware.ServerAddress(w.Config.Server.Address),
middleware.Executors(w.Executors),
middleware.Logger(logrus.StandardLogger(), time.RFC3339, true),
middleware.RegisterToken(w.RegisterToken),
)

// log a message indicating the start of serving traffic
Expand Down
5 changes: 0 additions & 5 deletions cmd/vela-worker/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func (w *Worker) Validate() error {
return fmt.Errorf("no worker server address provided")
}

// verify a server secret was provided
if len(w.Config.Server.Secret) == 0 {
return fmt.Errorf("no worker server secret provided")
}

// verify an executor driver was provided
if len(w.Config.Executor.Driver) == 0 {
return fmt.Errorf("no worker executor driver provided")
Expand Down
12 changes: 7 additions & 5 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type (
// Worker represents all configuration and
// system processes for the worker.
Worker struct {
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
}
)
10 changes: 7 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ services:
VELA_RUNTIME_PRIVILEGED_IMAGES: 'target/vela-docker'
VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS: 'true'
VELA_SERVER_ADDR: 'http://server:8080'
# comment the line below to use registration flow
VELA_SERVER_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
WORKER_ADDR: 'http://worker:8080'
WORKER_CHECK_IN: 5m
WORKER_CHECK_IN: 2m
restart: always
ports:
- "8081:8080"
Expand Down Expand Up @@ -68,9 +69,12 @@ services:
VELA_ADDR: 'http://localhost:8080'
VELA_WEBUI_ADDR: 'http://localhost:8888'
VELA_LOG_LEVEL: trace
# comment the line below to use registration flow
VELA_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
VELA_REFRESH_TOKEN_DURATION: 90m
VELA_ACCESS_TOKEN_DURATION: 60m
VELA_SERVER_PRIVATE_KEY: 'F534FF2A080E45F38E05DC70752E6787'
VELA_USER_REFRESH_TOKEN_DURATION: 90m
VELA_USER_ACCESS_TOKEN_DURATION: 60m
VELA_WORKER_AUTH_TOKEN_DURATION: 3m
VELA_DISABLE_WEBHOOK_VALIDATION: 'true'
VELA_ENABLE_SECURE_COOKIE: 'false'
VELA_REPO_ALLOWLIST: '*'
Expand Down
18 changes: 18 additions & 0 deletions router/middleware/register_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package middleware

import (
"github.com/gin-gonic/gin"
)

// RegisterToken is a middleware function that attaches the
// auth-token channel to the context of every http.Request.
func RegisterToken(r chan string) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("register-token", r)
c.Next()
}
}
48 changes: 48 additions & 0 deletions router/middleware/register_token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package middleware

import (
"net/http"
"net/http/httptest"
"reflect"
"testing"

"github.com/gin-gonic/gin"
)

func TestMiddleware_RegisterToken(t *testing.T) {
// setup types
want := make(chan string, 1)
got := make(chan string, 1)

want <- "foo"

// setup context
gin.SetMode(gin.TestMode)

resp := httptest.NewRecorder()
context, engine := gin.CreateTestContext(resp)
context.Request, _ = http.NewRequest(http.MethodGet, "/health", nil)

// setup mock server
engine.Use(RegisterToken(want))
engine.GET("/health", func(c *gin.Context) {
got = c.Value("register-token").(chan string)

c.Status(http.StatusOK)
})

// run test
engine.ServeHTTP(context.Writer, context.Request)

if resp.Code != http.StatusOK {
t.Errorf("RegisterToken returned %v, want %v", resp.Code, http.StatusOK)
}

if !reflect.DeepEqual(got, want) {
t.Errorf("RegisterToken is %v, want foo", got)
}
}
Loading

0 comments on commit 6c146e5

Please sign in to comment.