Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Add Consul state backend storage for Policies and Scale State.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed Dec 6, 2019
1 parent b0986e8 commit 73771c6
Show file tree
Hide file tree
Showing 12 changed files with 511 additions and 11 deletions.
3 changes: 3 additions & 0 deletions cmd/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func RegisterCommand(rootCmd *cobra.Command) error {
serverCfg.RegisterTelemetryConfig(cmd)
serverCfg.RegisterProviderConfig(cmd)
serverCfg.RegisterAutoscalerConfig(cmd)
serverCfg.RegisterStorageConfig(cmd)
logCfg.RegisterConfig(cmd)
rootCmd.AddCommand(cmd)

Expand All @@ -35,6 +36,7 @@ func RegisterCommand(rootCmd *cobra.Command) error {
func runServer(_ *cobra.Command, _ []string) {
autoscaleConfig := serverCfg.GetAutoscalerConfig()
providerConfig := serverCfg.GetProviderConfig()
storageConfig := serverCfg.GetStorageConfig()
serverConfig := serverCfg.GetConfig()
tlsConfig := serverCfg.GetTLSConfig()
telemetryConfig := serverCfg.GetTelemetryConfig()
Expand All @@ -50,6 +52,7 @@ func runServer(_ *cobra.Command, _ []string) {
Autoscale: autoscaleConfig,
Provider: providerConfig,
Server: &serverConfig,
Storage: storageConfig,
TLS: &tlsConfig,
Telemetry: &telemetryConfig,
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/client/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package client

import consulAPI "github.com/hashicorp/consul/api"

// NewConsulClient is responsible for generating a reusable Consul client using the HashiCorp
// Consul SDK and the default config. This default config pulls Consul client configuration from
// env vars which can therefore be customized by the user.
func NewConsulClient() (*consulAPI.Client, error) {
return consulAPI.NewClient(consulAPI.DefaultConfig())
}
66 changes: 66 additions & 0 deletions pkg/config/server/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package server

import (
"strings"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

const (
configKeyStorageConsulEnabled = "storage-consul-enabled"
configKeyStorageConsulPath = "storage-consul-path"
configKeyStorageConsulPathDefault = "chemtrail/"
)

// StorageConfig is the CLI configuration options for the state storage backend.
type StorageConfig struct {
ConsulEnabled bool
ConsulPath string
}

// GetStorageConfig populates a StorageConfig object with the users CLI parameters.
func GetStorageConfig() *StorageConfig {

// Check that the path has suffixed with a forward slash, otherwise put this on so we do not
// need to check in a number of other places.
path := viper.GetString(configKeyStorageConsulPath)
if suffix := strings.HasSuffix(path, "/"); !suffix {
path = path + "/"
}

return &StorageConfig{
ConsulEnabled: viper.GetBool(configKeyStorageConsulEnabled),
ConsulPath: path,
}
}

// RegisterStorageConfig register the storage CLI parameters for the state storage backend.
func RegisterStorageConfig(cmd *cobra.Command) {
flags := cmd.PersistentFlags()

{
const (
key = configKeyStorageConsulEnabled
longOpt = "storage-consul-enabled"
defaultValue = false
description = "Enable the Consul state storage backend"
)

flags.Bool(longOpt, defaultValue, description)
_ = viper.BindPFlag(key, flags.Lookup(longOpt))
viper.SetDefault(key, defaultValue)
}
{
const (
key = configKeyStorageConsulPath
longOpt = "storage-consul-path"
defaultValue = configKeyStorageConsulPathDefault
description = "The Consul KV base path that will be used to store state"
)

flags.String(longOpt, defaultValue, description)
_ = viper.BindPFlag(key, flags.Lookup(longOpt))
viper.SetDefault(key, defaultValue)
}
}
17 changes: 17 additions & 0 deletions pkg/config/server/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package server

import (
"testing"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
)

func Test_StorageConfig(t *testing.T) {
fakeCMD := &cobra.Command{}
RegisterStorageConfig(fakeCMD)

cfg := GetStorageConfig()
assert.Equal(t, false, cfg.ConsulEnabled)
assert.Equal(t, "chemtrail/", cfg.ConsulPath)
}
5 changes: 2 additions & 3 deletions pkg/scale/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"strings"
"time"

"github.com/jrasell/chemtrail/pkg/helper"
"github.com/jrasell/chemtrail/pkg/state"

"github.com/gofrs/uuid"
"github.com/hashicorp/nomad/api"
"github.com/jrasell/chemtrail/pkg/helper"
"github.com/jrasell/chemtrail/pkg/state"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Config struct {
Autoscale *serverCfg.AutoscalerConfig
Provider *serverCfg.ProviderConfig
Server *serverCfg.Config
Storage *serverCfg.StorageConfig
TLS *serverCfg.TLSConfig
Telemetry *serverCfg.TelemetryConfig
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/server/gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server

import "time"

// gcEvalPeriod is the time period at which the automatic scaling state garbage collector is run
// at.
var gcEvalPeriod = time.Minute * 10

// runGarbageCollectionLoop is responsible for periodically running the scaling state garbage
// collection function.
func (h *HTTPServer) runGarbageCollectionLoop() {
h.logger.Info().Msg("started scaling state garbage collector handler")

h.gcIsRunning = true

t := time.NewTicker(gcEvalPeriod)
defer t.Stop()

for {
select {
case <-h.stopChan:
h.logger.Info().Msg("shutting down state garbage collection handler")
h.gcIsRunning = false
return
case <-t.C:
h.logger.Debug().Msg("triggering internal run of state garbage collection")
h.scaleState.RunStateGarbageCollection()
}
}
}
67 changes: 61 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"syscall"

"github.com/armon/go-metrics"
consulAPI "github.com/hashicorp/consul/api"
"github.com/jrasell/chemtrail/pkg/client"
"github.com/jrasell/chemtrail/pkg/scale"
"github.com/jrasell/chemtrail/pkg/scale/auto"
"github.com/jrasell/chemtrail/pkg/scale/resource"
"github.com/jrasell/chemtrail/pkg/server/router"
"github.com/jrasell/chemtrail/pkg/state"
policyConsul "github.com/jrasell/chemtrail/pkg/state/policy/consul"
policyMemory "github.com/jrasell/chemtrail/pkg/state/policy/memory"
scaleConsul "github.com/jrasell/chemtrail/pkg/state/scale/consul"
scaleMemory "github.com/jrasell/chemtrail/pkg/state/scale/memory"
"github.com/jrasell/chemtrail/pkg/watcher"
"github.com/jrasell/chemtrail/pkg/watcher/allocs"
Expand All @@ -31,6 +34,9 @@ type HTTPServer struct {
cfg *Config
logger zerolog.Logger

// consul is our stored, reusable Consul client.
consul *consulAPI.Client

// nomad is our stored Nomad client wrapper which is reused in all areas which require Nomad
// API connectivity.
nomad *client.Nomad
Expand Down Expand Up @@ -59,16 +65,25 @@ type HTTPServer struct {

telemetry *metrics.InmemSink

// gcIsRunning is used to track whether this Chemtrail server is currently running the garbage
// collection loop.
gcIsRunning bool

// stopChan is used to synchronise stopping the HTTP server services and any handlers which it
// maintains operationally.
stopChan chan struct{}

http.Server
routes *routes
}

func New(l zerolog.Logger, cfg *Config) *HTTPServer {
return &HTTPServer{
addr: fmt.Sprintf("%s:%d", cfg.Server.Bind, cfg.Server.Port),
cfg: cfg,
logger: l,
routes: &routes{},
addr: fmt.Sprintf("%s:%d", cfg.Server.Bind, cfg.Server.Port),
cfg: cfg,
logger: l,
routes: &routes{},
stopChan: make(chan struct{}),
}
}

Expand All @@ -92,6 +107,9 @@ func (h *HTTPServer) Start() error {
go h.autoscaler.Run()
}

// Trigger the garbage collection periodic loop.
go h.runGarbageCollectionLoop()

h.handleSignals()
return nil
}
Expand All @@ -105,8 +123,14 @@ func (h *HTTPServer) setup() error {
Str("node-id", h.nomad.NodeID).
Msg("identified Chemtrail allocation nodeID")

h.policyState = policyMemory.NewPolicyBackend()
h.scaleState = scaleMemory.NewScaleStateBackend()
// Setup the reusable Consul client.
if err := h.setupConsulClient(); err != nil {
return err
}

// Setup the state backend.
h.setupStateBackend()

h.nodeResourceHandler = resource.NewHandler(h.logger, h.nomad)

h.scaler = scale.NewScaleBackend(&scale.BackendConfig{
Expand Down Expand Up @@ -194,6 +218,33 @@ func (h *HTTPServer) setupNomadClient() error {
return nil
}

// setupConsulClient is used to build and store our reusable Consul API client.
func (h *HTTPServer) setupConsulClient() error {
h.logger.Debug().Msg("setting up Consul client")

cc, err := client.NewConsulClient()
if err != nil {
return err
}
h.consul = cc

return nil
}

// setupStateBackend sets up the storage backend for Chemtrail state, depending on the operators
// desire.
func (h *HTTPServer) setupStateBackend() {
if h.cfg.Storage.ConsulEnabled {
h.logger.Debug().Msg("setting up Consul storage backend")
h.policyState = policyConsul.NewPolicyBackend(h.cfg.Storage.ConsulPath, h.consul)
h.scaleState = scaleConsul.NewScaleBackend(h.logger, h.cfg.Storage.ConsulPath, h.consul)
} else {
h.logger.Debug().Msg("setting up in-memory storage backend")
h.policyState = policyMemory.NewPolicyBackend()
h.scaleState = scaleMemory.NewScaleStateBackend()
}
}

func (h *HTTPServer) setupListener() net.Listener {
var (
err error
Expand Down Expand Up @@ -223,6 +274,10 @@ func (h *HTTPServer) Stop() error {
if h.autoscaler != nil && h.autoscaler.IsRunning() {
h.autoscaler.Stop()
}

// Send a signal to the HTTPServer stopChan instructing sub-process to stop.
close(h.stopChan)

return h.Shutdown(context.Background())
}

Expand Down
Loading

0 comments on commit 73771c6

Please sign in to comment.