diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index f39321d..86794d4 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -26,6 +26,7 @@ func RegisterCommand(rootCmd *cobra.Command) error { serverCfg.RegisterTelemetryConfig(cmd) serverCfg.RegisterProviderConfig(cmd) serverCfg.RegisterAutoscalerConfig(cmd) + serverCfg.RegisterStorageConfig(cmd) logCfg.RegisterConfig(cmd) rootCmd.AddCommand(cmd) @@ -35,6 +36,7 @@ func RegisterCommand(rootCmd *cobra.Command) error { func runServer(_ *cobra.Command, _ []string) { autoscaleConfig := serverCfg.GetAutoscalerConfig() providerConfig := serverCfg.GetProviderConfig() + storageConfig := serverCfg.GetStorageConfig() serverConfig := serverCfg.GetConfig() tlsConfig := serverCfg.GetTLSConfig() telemetryConfig := serverCfg.GetTelemetryConfig() @@ -50,6 +52,7 @@ func runServer(_ *cobra.Command, _ []string) { Autoscale: autoscaleConfig, Provider: providerConfig, Server: &serverConfig, + Storage: storageConfig, TLS: &tlsConfig, Telemetry: &telemetryConfig, } diff --git a/pkg/client/consul.go b/pkg/client/consul.go new file mode 100644 index 0000000..08972f4 --- /dev/null +++ b/pkg/client/consul.go @@ -0,0 +1,10 @@ +package client + +import consulAPI "github.com/hashicorp/consul/api" + +// NewConsulClient is responsible for generating a reusable Consul client using the HashiCorp +// Consul SDK and the default config. This default config pulls Consul client configuration from +// env vars which can therefore be customized by the user. +func NewConsulClient() (*consulAPI.Client, error) { + return consulAPI.NewClient(consulAPI.DefaultConfig()) +} diff --git a/pkg/config/server/storage.go b/pkg/config/server/storage.go new file mode 100644 index 0000000..9fe2d5b --- /dev/null +++ b/pkg/config/server/storage.go @@ -0,0 +1,66 @@ +package server + +import ( + "strings" + + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +const ( + configKeyStorageConsulEnabled = "storage-consul-enabled" + configKeyStorageConsulPath = "storage-consul-path" + configKeyStorageConsulPathDefault = "chemtrail/" +) + +// StorageConfig is the CLI configuration options for the state storage backend. +type StorageConfig struct { + ConsulEnabled bool + ConsulPath string +} + +// GetStorageConfig populates a StorageConfig object with the users CLI parameters. +func GetStorageConfig() *StorageConfig { + + // Check that the path has suffixed with a forward slash, otherwise put this on so we do not + // need to check in a number of other places. + path := viper.GetString(configKeyStorageConsulPath) + if suffix := strings.HasSuffix(path, "/"); !suffix { + path = path + "/" + } + + return &StorageConfig{ + ConsulEnabled: viper.GetBool(configKeyStorageConsulEnabled), + ConsulPath: path, + } +} + +// RegisterStorageConfig register the storage CLI parameters for the state storage backend. +func RegisterStorageConfig(cmd *cobra.Command) { + flags := cmd.PersistentFlags() + + { + const ( + key = configKeyStorageConsulEnabled + longOpt = "storage-consul-enabled" + defaultValue = false + description = "Enable the Consul state storage backend" + ) + + flags.Bool(longOpt, defaultValue, description) + _ = viper.BindPFlag(key, flags.Lookup(longOpt)) + viper.SetDefault(key, defaultValue) + } + { + const ( + key = configKeyStorageConsulPath + longOpt = "storage-consul-path" + defaultValue = configKeyStorageConsulPathDefault + description = "The Consul KV base path that will be used to store state" + ) + + flags.String(longOpt, defaultValue, description) + _ = viper.BindPFlag(key, flags.Lookup(longOpt)) + viper.SetDefault(key, defaultValue) + } +} diff --git a/pkg/config/server/storage_test.go b/pkg/config/server/storage_test.go new file mode 100644 index 0000000..148813a --- /dev/null +++ b/pkg/config/server/storage_test.go @@ -0,0 +1,17 @@ +package server + +import ( + "testing" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/assert" +) + +func Test_StorageConfig(t *testing.T) { + fakeCMD := &cobra.Command{} + RegisterStorageConfig(fakeCMD) + + cfg := GetStorageConfig() + assert.Equal(t, false, cfg.ConsulEnabled) + assert.Equal(t, "chemtrail/", cfg.ConsulPath) +} diff --git a/pkg/scale/drain.go b/pkg/scale/drain.go index b912dcf..fa8783e 100644 --- a/pkg/scale/drain.go +++ b/pkg/scale/drain.go @@ -5,11 +5,10 @@ import ( "strings" "time" - "github.com/jrasell/chemtrail/pkg/helper" - "github.com/jrasell/chemtrail/pkg/state" - "github.com/gofrs/uuid" "github.com/hashicorp/nomad/api" + "github.com/jrasell/chemtrail/pkg/helper" + "github.com/jrasell/chemtrail/pkg/state" ) const ( diff --git a/pkg/server/config.go b/pkg/server/config.go index 8c8b62e..d2fd9a1 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -10,6 +10,7 @@ type Config struct { Autoscale *serverCfg.AutoscalerConfig Provider *serverCfg.ProviderConfig Server *serverCfg.Config + Storage *serverCfg.StorageConfig TLS *serverCfg.TLSConfig Telemetry *serverCfg.TelemetryConfig } diff --git a/pkg/server/gc.go b/pkg/server/gc.go new file mode 100644 index 0000000..3a29b98 --- /dev/null +++ b/pkg/server/gc.go @@ -0,0 +1,30 @@ +package server + +import "time" + +// gcEvalPeriod is the time period at which the automatic scaling state garbage collector is run +// at. +var gcEvalPeriod = time.Minute * 10 + +// runGarbageCollectionLoop is responsible for periodically running the scaling state garbage +// collection function. +func (h *HTTPServer) runGarbageCollectionLoop() { + h.logger.Info().Msg("started scaling state garbage collector handler") + + h.gcIsRunning = true + + t := time.NewTicker(gcEvalPeriod) + defer t.Stop() + + for { + select { + case <-h.stopChan: + h.logger.Info().Msg("shutting down state garbage collection handler") + h.gcIsRunning = false + return + case <-t.C: + h.logger.Debug().Msg("triggering internal run of state garbage collection") + h.scaleState.RunStateGarbageCollection() + } + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 4310ea1..2330137 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -11,13 +11,16 @@ import ( "syscall" "github.com/armon/go-metrics" + consulAPI "github.com/hashicorp/consul/api" "github.com/jrasell/chemtrail/pkg/client" "github.com/jrasell/chemtrail/pkg/scale" "github.com/jrasell/chemtrail/pkg/scale/auto" "github.com/jrasell/chemtrail/pkg/scale/resource" "github.com/jrasell/chemtrail/pkg/server/router" "github.com/jrasell/chemtrail/pkg/state" + policyConsul "github.com/jrasell/chemtrail/pkg/state/policy/consul" policyMemory "github.com/jrasell/chemtrail/pkg/state/policy/memory" + scaleConsul "github.com/jrasell/chemtrail/pkg/state/scale/consul" scaleMemory "github.com/jrasell/chemtrail/pkg/state/scale/memory" "github.com/jrasell/chemtrail/pkg/watcher" "github.com/jrasell/chemtrail/pkg/watcher/allocs" @@ -31,6 +34,9 @@ type HTTPServer struct { cfg *Config logger zerolog.Logger + // consul is our stored, reusable Consul client. + consul *consulAPI.Client + // nomad is our stored Nomad client wrapper which is reused in all areas which require Nomad // API connectivity. nomad *client.Nomad @@ -59,16 +65,25 @@ type HTTPServer struct { telemetry *metrics.InmemSink + // gcIsRunning is used to track whether this Chemtrail server is currently running the garbage + // collection loop. + gcIsRunning bool + + // stopChan is used to synchronise stopping the HTTP server services and any handlers which it + // maintains operationally. + stopChan chan struct{} + http.Server routes *routes } func New(l zerolog.Logger, cfg *Config) *HTTPServer { return &HTTPServer{ - addr: fmt.Sprintf("%s:%d", cfg.Server.Bind, cfg.Server.Port), - cfg: cfg, - logger: l, - routes: &routes{}, + addr: fmt.Sprintf("%s:%d", cfg.Server.Bind, cfg.Server.Port), + cfg: cfg, + logger: l, + routes: &routes{}, + stopChan: make(chan struct{}), } } @@ -92,6 +107,9 @@ func (h *HTTPServer) Start() error { go h.autoscaler.Run() } + // Trigger the garbage collection periodic loop. + go h.runGarbageCollectionLoop() + h.handleSignals() return nil } @@ -105,8 +123,14 @@ func (h *HTTPServer) setup() error { Str("node-id", h.nomad.NodeID). Msg("identified Chemtrail allocation nodeID") - h.policyState = policyMemory.NewPolicyBackend() - h.scaleState = scaleMemory.NewScaleStateBackend() + // Setup the reusable Consul client. + if err := h.setupConsulClient(); err != nil { + return err + } + + // Setup the state backend. + h.setupStateBackend() + h.nodeResourceHandler = resource.NewHandler(h.logger, h.nomad) h.scaler = scale.NewScaleBackend(&scale.BackendConfig{ @@ -194,6 +218,33 @@ func (h *HTTPServer) setupNomadClient() error { return nil } +// setupConsulClient is used to build and store our reusable Consul API client. +func (h *HTTPServer) setupConsulClient() error { + h.logger.Debug().Msg("setting up Consul client") + + cc, err := client.NewConsulClient() + if err != nil { + return err + } + h.consul = cc + + return nil +} + +// setupStateBackend sets up the storage backend for Chemtrail state, depending on the operators +// desire. +func (h *HTTPServer) setupStateBackend() { + if h.cfg.Storage.ConsulEnabled { + h.logger.Debug().Msg("setting up Consul storage backend") + h.policyState = policyConsul.NewPolicyBackend(h.cfg.Storage.ConsulPath, h.consul) + h.scaleState = scaleConsul.NewScaleBackend(h.logger, h.cfg.Storage.ConsulPath, h.consul) + } else { + h.logger.Debug().Msg("setting up in-memory storage backend") + h.policyState = policyMemory.NewPolicyBackend() + h.scaleState = scaleMemory.NewScaleStateBackend() + } +} + func (h *HTTPServer) setupListener() net.Listener { var ( err error @@ -223,6 +274,10 @@ func (h *HTTPServer) Stop() error { if h.autoscaler != nil && h.autoscaler.IsRunning() { h.autoscaler.Stop() } + + // Send a signal to the HTTPServer stopChan instructing sub-process to stop. + close(h.stopChan) + return h.Shutdown(context.Background()) } diff --git a/pkg/state/policy/consul/consul.go b/pkg/state/policy/consul/consul.go new file mode 100644 index 0000000..50a6249 --- /dev/null +++ b/pkg/state/policy/consul/consul.go @@ -0,0 +1,98 @@ +package consul + +import ( + "encoding/json" + "strings" + + "github.com/hashicorp/consul/api" + "github.com/jrasell/chemtrail/pkg/state" + "github.com/pkg/errors" +) + +// baseKVPath is suffixed to the operator supplied Consul path as the location where Chemtrail +// policies are stored. +const baseKVPath = "policies/" + +// PolicyBackend is the Consul implementation of the state.PolicyBackend interface. +type PolicyBackend struct { + path string + kv *api.KV +} + +// NewPolicyBackend returns the Consul implementation of the state.PolicyBackend interface. +func NewPolicyBackend(path string, client *api.Client) state.PolicyBackend { + return &PolicyBackend{ + path: path + baseKVPath, + kv: client.KV(), + } +} + +// GetPolicies satisfies the GetPolicies function on the state.PolicyBackend interface. +func (p PolicyBackend) GetPolicies() (map[string]*state.ClientScalingPolicy, error) { + kv, _, err := p.kv.List(p.path, nil) + if err != nil { + return nil, err + } + + out := make(map[string]*state.ClientScalingPolicy) + + // If there are no KV entries, return the empty map. + if kv == nil { + return out, nil + } + + for i := range kv { + p := &state.ClientScalingPolicy{} + + if err := json.Unmarshal(kv[i].Value, p); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal Consul KV value") + } + keySplit := strings.Split(kv[i].Key, "/") + + out[keySplit[len(keySplit)-1]] = p + } + + return out, nil +} + +// GetPolicy satisfies the GetPolicy function on the state.PolicyBackend interface. +func (p PolicyBackend) GetPolicy(class string) (*state.ClientScalingPolicy, error) { + kv, _, err := p.kv.Get(p.path+class, nil) + if err != nil { + return nil, err + } + + if kv == nil { + return nil, nil + } + + out := &state.ClientScalingPolicy{} + + if err := json.Unmarshal(kv.Value, out); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal Consul KV value") + } + + return out, nil +} + +// PutPolicy satisfies the PutPolicy function on the state.PolicyBackend interface. +func (p PolicyBackend) PutPolicy(policy *state.ClientScalingPolicy) error { + marshal, err := json.Marshal(policy) + if err != nil { + return err + } + + pair := &api.KVPair{ + Key: p.path + policy.Class, + Value: marshal, + } + + _, err = p.kv.Put(pair, nil) + return err +} + +// DeletePolicy satisfies the DeletePolicy function on the state.PolicyBackend interface. +func (p PolicyBackend) DeletePolicy(class string) error { + _, err := p.kv.Delete(p.path+class, nil) + return err +} diff --git a/pkg/state/scale.go b/pkg/state/scale.go index bf3b762..8eab6ce 100644 --- a/pkg/state/scale.go +++ b/pkg/state/scale.go @@ -157,3 +157,12 @@ func (em EventMessage) MarshalZerologObject(e *zerolog.Event) { e.Str("message", em.Message) } } + +const ( + // ScaleChemtrailSource is the source used when building the start event for a scaling + // activity. + ScaleChemtrailSource = "chemtrail" + + // ScaleStartMessage is the message used as the first event in a scaling activity. + ScaleStartMessage = "scaling activity has started" +) diff --git a/pkg/state/scale/consul/consul.go b/pkg/state/scale/consul/consul.go new file mode 100644 index 0000000..35223ad --- /dev/null +++ b/pkg/state/scale/consul/consul.go @@ -0,0 +1,212 @@ +package consul + +import ( + "encoding/json" + "strings" + + "github.com/gofrs/uuid" + "github.com/hashicorp/consul/api" + "github.com/jrasell/chemtrail/pkg/helper" + "github.com/jrasell/chemtrail/pkg/state" + "github.com/pkg/errors" + "github.com/rs/zerolog" +) + +// baseEventKVPath is Consul path suffix added to the CLI param which identifies where scaling +// state is stored. +const baseEventKVPath = "state/events/" + +// ScaleBackend is the Consul implementation of the state.ScaleBackend interface. +type ScaleBackend struct { + eventPath string + gcThreshold int64 + kv *api.KV + logger zerolog.Logger +} + +// NewPolicyBackend returns the Consul implementation of the state.ScaleBackend interface. +func NewScaleBackend(log zerolog.Logger, path string, client *api.Client) state.ScaleBackend { + return &ScaleBackend{ + eventPath: path + baseEventKVPath, + gcThreshold: state.GarbageCollectionThreshold, + kv: client.KV(), + logger: log, + } +} + +// GetScalingActivities satisfies the GetScalingActivities function on the state.ScaleBackend +// interface. +func (s *ScaleBackend) GetScalingActivities() (map[uuid.UUID]*state.ScalingActivity, error) { + kv, _, err := s.kv.List(s.eventPath, nil) + if err != nil { + return nil, err + } + + out := make(map[uuid.UUID]*state.ScalingActivity) + + if kv == nil { + return out, nil + } + + for i := range kv { + activity := &state.ScalingActivity{} + + if err := json.Unmarshal(kv[i].Value, activity); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal Consul KV value") + } + + keySplit := strings.Split(kv[i].Key, "/") + + id, err := uuid.FromString(keySplit[len(keySplit)-1]) + if err != nil { + return nil, errors.Wrap(err, "failed to get UUID from string") + } + + out[id] = activity + } + + return out, nil +} + +// GetScalingActivity satisfies the GetScalingActivity function on the state.ScaleBackend +// interface. +func (s *ScaleBackend) GetScalingActivity(id uuid.UUID) (*state.ScalingActivity, error) { + kv, _, err := s.kv.Get(s.eventPath+id.String(), nil) + if err != nil { + return nil, err + } + + if kv == nil { + return nil, nil + } + + out := &state.ScalingActivity{} + + if err := json.Unmarshal(kv.Value, out); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal Consul KV value") + } + + return out, nil +} + +// RunStateGarbageCollection satisfies the RunStateGarbageCollection function on the +// state.ScaleBackend interface. +func (s *ScaleBackend) RunStateGarbageCollection() { + + kv, _, err := s.kv.List(s.eventPath, nil) + if err != nil { + s.logger.Error().Err(err).Msg("GC failed to list events in Consul backend") + return + } + + if kv == nil { + return + } + + gc := helper.GenerateEventTimestamp() - s.gcThreshold + + for i := range kv { + + ss := &state.ScalingActivity{} + + if err := json.Unmarshal(kv[i].Value, ss); err != nil { + s.logger.Error().Err(err).Msg("GC failed to unmarshal event for inspection") + continue + } + + switch ss.Status { + case state.ScaleStatusCompleted, state.ScaleStatusFailed: + if ss.LastUpdate < gc { + // Unlike the in-memory, we currently delete keys which have passed the expiration + // threshold. Delete vs. re-create has not been benchmarked, but my initial opinion is + // that delete will be more efficient and is at least easier for the MVP. + if _, err := s.kv.Delete(kv[i].Key, nil); err != nil { + s.logger.Error(). + Str("key", kv[i].Key). + Err(err). + Msg("GC failed to delete stale event in Consul backend") + } + } + default: + continue + } + } +} + +// WriteRequest satisfies the WriteRequest function on the state.ScaleBackend interface. +func (s *ScaleBackend) WriteRequest(req *state.ScalingRequest) error { + ts := helper.GenerateEventTimestamp() + + entry := state.ScalingActivity{ + Events: []state.Event{{ + Timestamp: ts, + Message: state.ScaleStartMessage, + Source: state.ScaleChemtrailSource, + }}, + Direction: req.Direction, + LastUpdate: ts, + Status: state.ScaleStatusStarted, + Provider: req.Policy.Provider, + ProviderCfg: req.Policy.ProviderConfig, + } + + marshal, err := json.Marshal(entry) + if err != nil { + return err + } + + pair := &api.KVPair{ + Key: s.eventPath + req.ID.String(), + Value: marshal, + } + + _, err = s.kv.Put(pair, nil) + return err +} + +// WriteRequestEvent satisfies the WriteRequestEvent function on the state.ScaleBackend interface. +func (s *ScaleBackend) WriteRequestEvent(message *state.ScalingUpdate) error { + kv, _, err := s.kv.Get(s.eventPath+message.ID.String(), nil) + if err != nil { + return err + } + + // Adding an event to an activity requires the initial state be written to Consul. In the + // situation where no KV is found, this is an error and should be reported as such. + if kv == nil { + return errors.New("scaling activity not found in Consul backend") + } + + event := &state.ScalingActivity{} + + if err := json.Unmarshal(kv.Value, event); err != nil { + return errors.Wrap(err, "failed to unmarshal Consul KV value") + } + + // Build the additional activity information and add this to the list of events whilst updating + // the timestamp and status. + detail := state.Event{ + Timestamp: message.Detail.Timestamp, + Message: message.Detail.Message, + Source: message.Detail.Source, + } + event.Events = append(event.Events, detail) + event.LastUpdate = message.Detail.Timestamp + + if message.Status != state.ScaleStatusInProgress { + event.Status = message.Status + } + + marshal, err := json.Marshal(event) + if err != nil { + return err + } + + pair := &api.KVPair{ + Key: s.eventPath + message.ID.String(), + Value: marshal, + } + + _, err = s.kv.Put(pair, nil) + return err +} diff --git a/pkg/state/scale/memory/memory.go b/pkg/state/scale/memory/memory.go index 5e4f32d..a14f6b5 100644 --- a/pkg/state/scale/memory/memory.go +++ b/pkg/state/scale/memory/memory.go @@ -85,8 +85,8 @@ func (s *ScaleBackend) WriteRequest(req *state.ScalingRequest) error { entry := state.ScalingActivity{ Events: []state.Event{{ Timestamp: ts, - Message: "scaling activity has started", - Source: "chemtrail", + Message: state.ScaleStartMessage, + Source: state.ScaleChemtrailSource, }}, Direction: req.Direction, LastUpdate: ts,