From dadcfb1b642999853d79ee8caaf8e4eb8ae8f1c7 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Tue, 23 Jan 2024 15:35:12 -0800 Subject: [PATCH 1/8] Add support for alerting if labels are missing - SetStatus now accepts labels that go along with the alert. Amongst the labels, a special label called "severity" controls the severity of the alert --- pkg/jobs/job-tracker.go | 2 +- pkg/jobs/job.go | 7 +- pkg/labelstatus/labelstatus.go | 112 +++++++++++++++++ pkg/notifiers/fx-driver.go | 16 +-- pkg/otelcollector/otelcollector.go | 2 +- pkg/platform/platform.go | 4 +- pkg/platform/readiness.go | 4 +- .../actuators/podscaler/pod-scaler.go | 6 +- .../controlplane/components/alerter.go | 3 +- pkg/policies/controlplane/policy-factory.go | 4 +- pkg/policies/controlplane/runtime/circuit.go | 2 +- .../concurrency-limiter.go | 113 ++++++++++++------ .../concurrency-scheduler.go | 4 +- .../load-scheduler/load-scheduler.go | 14 +-- .../quota-scheduler/quota-scheduler.go | 4 +- .../actuators/rate-limiter/rate-limiter.go | 4 +- .../flowcontrol/actuators/sampler/sampler.go | 4 +- .../resources/fluxmeter/flux-meter.go | 2 +- pkg/status/registry.go | 30 +++-- pkg/status/status_test.go | 10 +- pkg/watchdog/watchdog.go | 2 +- test/aperture_suite_test.go | 4 +- 22 files changed, 260 insertions(+), 93 deletions(-) create mode 100644 pkg/labelstatus/labelstatus.go diff --git a/pkg/jobs/job-tracker.go b/pkg/jobs/job-tracker.go index 259ea32223..3caae00179 100644 --- a/pkg/jobs/job-tracker.go +++ b/pkg/jobs/job-tracker.go @@ -69,7 +69,7 @@ func (gt *groupTracker) updateStatus(job Job, s *statusv1.Status) error { return errExistingJob } - tracker.statusRegistry.SetStatus(s) + tracker.statusRegistry.SetStatus(s, nil) return nil } diff --git a/pkg/jobs/job.go b/pkg/jobs/job.go index 62b0d927e8..59902e30dc 100644 --- a/pkg/jobs/job.go +++ b/pkg/jobs/job.go @@ -10,10 +10,11 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/wrapperspb" + "github.com/reugn/go-quartz/quartz" + jobsconfig "github.com/fluxninja/aperture/v2/pkg/jobs/config" panichandler "github.com/fluxninja/aperture/v2/pkg/panic-handler" "github.com/fluxninja/aperture/v2/pkg/status" - "github.com/reugn/go-quartz/quartz" ) // JobCallback is the callback function that is called after a job is executed. @@ -133,11 +134,11 @@ func (executor *jobExecutor) doJob(ctx context.Context) (proto.Message, error) { select { case <-timerCh: s := status.NewStatus(wrapperspb.String("Timeout"), errors.New("job execution timeout")) - executor.livenessRegistry.SetStatus(s) + executor.livenessRegistry.SetStatus(s, nil) timer.Reset(time.Second * 1) case <-jobCh: s := status.NewStatus(wrapperspb.String("OK"), nil) - executor.livenessRegistry.SetStatus(s) + executor.livenessRegistry.SetStatus(s, nil) timer.Stop() return msg, err } diff --git a/pkg/labelstatus/labelstatus.go b/pkg/labelstatus/labelstatus.go new file mode 100644 index 0000000000..3b76591eef --- /dev/null +++ b/pkg/labelstatus/labelstatus.go @@ -0,0 +1,112 @@ +package labelstatus + +import ( + "context" + "errors" + "sync" + "time" + + "go.uber.org/fx" + "google.golang.org/protobuf/proto" + + "github.com/fluxninja/aperture/v2/pkg/alerts" + "github.com/fluxninja/aperture/v2/pkg/config" + "github.com/fluxninja/aperture/v2/pkg/jobs" + "github.com/fluxninja/aperture/v2/pkg/status" +) + +// LabelStatusFactory is a factory for creating LabelStatus. +type LabelStatusFactory struct { + registry status.Registry +} + +// LabelStatusModule is an fx module for providing LabelStatusFactory. +func LabelStatusModule() fx.Option { + return fx.Options( + fx.Provide(NewLabelStatusFactory), + ) +} + +// NewLabelStatusFactory creates a new LabelStatusFactory. +func NewLabelStatusFactory(statusRegistry status.Registry) *LabelStatusFactory { + return &LabelStatusFactory{ + registry: statusRegistry.Child("label", "status"), + } +} + +// New creates a new LabelStatus. +func (lsf *LabelStatusFactory) New(labelKey string, policyName string, componentID string) *LabelStatus { + reg := lsf.registry.Child("label", labelKey) + return &LabelStatus{ + registry: reg, + labelKey: labelKey, + policyName: policyName, + componentID: componentID, + } +} + +// LabelStatus holds the status of the labels. +type LabelStatus struct { + lock sync.RWMutex + registry status.Registry + timestamp time.Time + labelKey string + policyName string + componentID string +} + +// Setup sets up the LabelsStatus's lifecycle hooks. +func (ls *LabelStatus) Setup(jobGroup *jobs.JobGroup, lifecycle fx.Lifecycle) { + lifecycle.Append(fx.Hook{ + OnStart: func(context.Context) error { + job := jobs.NewBasicJob("", ls.setLookupStatus) + err := jobGroup.RegisterJob(job, jobs.JobConfig{ + ExecutionPeriod: config.MakeDuration(10 * time.Second), + }) + if err != nil { + return err + } + return nil + }, + OnStop: func(context.Context) error { + err := jobGroup.DeregisterJob("") + if err != nil { + return err + } + return nil + }, + }) +} + +// SetMissing sets the status to missing with current timestamp. +func (ls *LabelStatus) SetMissing() { + ls.lock.Lock() + defer ls.lock.Unlock() + ls.timestamp = time.Now() +} + +func (ls *LabelStatus) setLookupStatus(ctx context.Context) (proto.Message, error) { + ls.lock.Lock() + defer ls.lock.Unlock() + + if ls.timestamp.IsZero() { + return nil, nil + } + + labels := map[string]string{ + "policy": ls.policyName, + "component": ls.componentID, + } + + if time.Since(ls.timestamp) >= 5*time.Minute { + labels["severity"] = alerts.SeverityInfo.String() + ls.registry.SetStatus(nil, labels) + return nil, nil + } else { + labels["severity"] = alerts.SeverityCrit.String() + s := status.NewStatus(nil, errors.New("label "+ls.labelKey+"missing")) + ls.registry.SetStatus(s, labels) + } + + return nil, nil +} diff --git a/pkg/notifiers/fx-driver.go b/pkg/notifiers/fx-driver.go index de004813c7..98b0fb362e 100644 --- a/pkg/notifiers/fx-driver.go +++ b/pkg/notifiers/fx-driver.go @@ -60,7 +60,7 @@ func (fr *fxRunner) processEvent(event Event, unmarshaller config.Unmarshaller) } func (fr *fxRunner) initApp(key Key, unmarshaller config.Unmarshaller) error { - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner initializing"), nil)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner initializing"), nil), nil) logger := fr.fxRunnerStatusRegistry.GetLogger() if fr.app == nil && unmarshaller != nil { @@ -98,34 +98,34 @@ func (fr *fxRunner) initApp(key Key, unmarshaller config.Unmarshaller) error { if err = fr.app.Err(); err != nil { visualize, _ := fx.VisualizeError(err) logger.Error().Err(err).Str("visualize", visualize).Msg("fx.New failed") - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err), nil) if deinitErr := fr.deinitApp(); deinitErr != nil { logger.Error().Err(deinitErr).Msg("Failed to deinitialize application after start failure") } return err } - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner starting"), nil)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner starting"), nil), nil) ctx, cancel := context.WithTimeout(context.Background(), fr.app.StartTimeout()) defer cancel() if err = fr.app.Start(ctx); err != nil { logger.Error().Err(err).Msg("Could not start application") - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err), nil) if deinitErr := fr.deinitApp(); deinitErr != nil { logger.Error().Err(deinitErr).Msg("Failed to deinitialize application after start failure") } return err } - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner started"), nil)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner started"), nil), nil) } else { - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, errors.New("fxRunner is not initialized"))) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, errors.New("fxRunner is not initialized")), nil) } return nil } func (fr *fxRunner) deinitApp() error { - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopping"), nil)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopping"), nil), nil) logger := fr.fxRunnerStatusRegistry.GetLogger() if fr.app != nil { ctx, cancel := context.WithTimeout(context.Background(), fr.app.StopTimeout()) @@ -138,7 +138,7 @@ func (fr *fxRunner) deinitApp() error { return err } } - fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopped"), nil)) + fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopped"), nil), nil) return nil } diff --git a/pkg/otelcollector/otelcollector.go b/pkg/otelcollector/otelcollector.go index a4ddd3271c..3630db0577 100644 --- a/pkg/otelcollector/otelcollector.go +++ b/pkg/otelcollector/otelcollector.go @@ -163,5 +163,5 @@ func otelState(otelService *otelcol.Collector) (proto.Message, error) { func setReadinessStatus(statusRegistry status.Registry, msg proto.Message, err error) { statusRegistry.Child("system", "readiness"). Child("component", "otel-collector"). - SetStatus(status.NewStatus(msg, err)) + SetStatus(status.NewStatus(msg, err), nil) } diff --git a/pkg/platform/platform.go b/pkg/platform/platform.go index d043a00162..81b5d82ab2 100644 --- a/pkg/platform/platform.go +++ b/pkg/platform/platform.go @@ -164,12 +164,12 @@ func Run(app *fx.App) { defer stop(app) - platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(wrapperspb.String("platform running"), nil)) + platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(wrapperspb.String("platform running"), nil), nil) // Wait for os.Signal signal := <-app.Done() log.Info().Str("signal", signal.String()).Msg("Received signal. Stopping application") - platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(nil, errors.New("platform stopping"))) + platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(nil, errors.New("platform stopping")), nil) } func stop(app *fx.App) { diff --git a/pkg/platform/readiness.go b/pkg/platform/readiness.go index 27566dcf0a..1ed346ca18 100644 --- a/pkg/platform/readiness.go +++ b/pkg/platform/readiness.go @@ -30,13 +30,13 @@ func platformReadinessStatus(in platformReadinessStatusIn) error { OnStart: func(context.Context) error { platform.statusRegistry.Child("system", readinessStatusPath). Child("component", platformStatusPath). - SetStatus(status.NewStatus(nil, errors.New("platform starting"))) + SetStatus(status.NewStatus(nil, errors.New("platform starting")), nil) return nil }, OnStop: func(context.Context) error { platform.statusRegistry.Child("system", readinessStatusPath). Child("component", platformStatusPath). - SetStatus(status.NewStatus(nil, errors.New("platform stopped"))) + SetStatus(status.NewStatus(nil, errors.New("platform stopped")), nil) return nil }, }) diff --git a/pkg/policies/autoscale/kubernetes/actuators/podscaler/pod-scaler.go b/pkg/policies/autoscale/kubernetes/actuators/podscaler/pod-scaler.go index f3ae21b7bd..f1e708bcab 100644 --- a/pkg/policies/autoscale/kubernetes/actuators/podscaler/pod-scaler.go +++ b/pkg/policies/autoscale/kubernetes/actuators/podscaler/pod-scaler.go @@ -162,7 +162,7 @@ func (psFactory *podScalerFactory) newPodScalerOptions( wrapperMessage := &policysyncv1.PodScalerWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.PodScaler == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal pod scaler") return fx.Options(), err } @@ -296,14 +296,14 @@ func (ps *podScaler) setup( logger.Error().Err(err).Msg("Failed to remove control point notifier") merr = multierror.Append(merr, err) } - ps.registry.SetStatus(status.NewStatus(nil, merr)) + ps.registry.SetStatus(status.NewStatus(nil, merr), nil) ps.cancel() ps.etcdClient.Delete(ps.statusEtcdPath) if err != nil { logger.Error().Err(err).Msg("Failed to delete scale status") merr = multierr.Append(merr, err) } - ps.registry.SetStatus(status.NewStatus(nil, merr)) + ps.registry.SetStatus(status.NewStatus(nil, merr), nil) return merr }, }) diff --git a/pkg/policies/controlplane/components/alerter.go b/pkg/policies/controlplane/components/alerter.go index 07590f5679..890ba4f8b0 100644 --- a/pkg/policies/controlplane/components/alerter.go +++ b/pkg/policies/controlplane/components/alerter.go @@ -82,8 +82,7 @@ func (a *Alerter) Execute(inPortReadings runtime.PortToReading, circuitAPI runti } // DynamicConfigUpdate is a no-op for Alerter. -func (a *Alerter) DynamicConfigUpdate(event notifiers.Event, unmarshaller config.Unmarshaller) { -} +func (a *Alerter) DynamicConfigUpdate(event notifiers.Event, unmarshaller config.Unmarshaller) {} func (a *Alerter) createAlert() *alerts.Alert { newAlert := alerts.NewAlert( diff --git a/pkg/policies/controlplane/policy-factory.go b/pkg/policies/controlplane/policy-factory.go index 44b8d39658..c7a10cfa86 100644 --- a/pkg/policies/controlplane/policy-factory.go +++ b/pkg/policies/controlplane/policy-factory.go @@ -146,7 +146,7 @@ func (factory *PolicyFactory) provideControllerPolicyFxOptions( var wrapperMessage policysyncv1.PolicyWrapper err := unmarshaller.Unmarshal(&wrapperMessage) if err != nil || wrapperMessage.Policy == nil { - registry.SetStatus(status.NewStatus(nil, err)) + registry.SetStatus(status.NewStatus(nil, err), nil) registry.GetLogger().Error().Err(err).Msg("Failed to unmarshal policy config wrapper") return fx.Options(), err } @@ -156,7 +156,7 @@ func (factory *PolicyFactory) provideControllerPolicyFxOptions( registry, ) if err != nil { - registry.SetStatus(status.NewStatus(nil, err)) + registry.SetStatus(status.NewStatus(nil, err), nil) registry.GetLogger().Warn().Err(err).Msg("Failed to create policy options") return fx.Options(), err } diff --git a/pkg/policies/controlplane/runtime/circuit.go b/pkg/policies/controlplane/runtime/circuit.go index 40f993d5af..1aec57b3c9 100644 --- a/pkg/policies/controlplane/runtime/circuit.go +++ b/pkg/policies/controlplane/runtime/circuit.go @@ -282,7 +282,7 @@ func (circuit *Circuit) Execute(tickInfo TickInfo) error { } signalStatus := status.NewStatus(signalInfo, nil) - reg.SetStatus(signalStatus) + reg.SetStatus(signalStatus, nil) }() // Populate with last run's looped signal diff --git a/pkg/policies/flowcontrol/actuators/concurrency-limiter/concurrency-limiter.go b/pkg/policies/flowcontrol/actuators/concurrency-limiter/concurrency-limiter.go index 72d43612cd..7a2b99d23b 100644 --- a/pkg/policies/flowcontrol/actuators/concurrency-limiter/concurrency-limiter.go +++ b/pkg/policies/flowcontrol/actuators/concurrency-limiter/concurrency-limiter.go @@ -2,7 +2,6 @@ package concurrencylimiter import ( "context" - "errors" "fmt" "path" "strconv" @@ -22,7 +21,9 @@ import ( concurrencylimiter "github.com/fluxninja/aperture/v2/pkg/dmap-funcs/concurrency-limiter" etcdclient "github.com/fluxninja/aperture/v2/pkg/etcd/client" etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" + "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" @@ -72,12 +73,14 @@ func provideConcurrencyLimiterWatchers( } type concurrencyLimiterFactory struct { - engineAPI iface.Engine - registry status.Registry - distCache *distcache.DistCache - decisionsWatcher notifiers.Watcher - counterVector *prometheus.CounterVec - agentGroupName string + engineAPI iface.Engine + registry status.Registry + distCache *distcache.DistCache + decisionsWatcher notifiers.Watcher + counterVector *prometheus.CounterVec + agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // main fx app. @@ -90,6 +93,7 @@ func setupConcurrencyLimiterFactory( prometheusRegistry *prometheus.Registry, etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { agentGroupName := ai.GetAgentGroup() etcdPath := path.Join(paths.ConcurrencyLimiterDecisionsPath) @@ -99,19 +103,28 @@ func setupConcurrencyLimiterFactory( } reg := statusRegistry.Child("component", concurrencyLimiterStatusRoot) + logger := reg.GetLogger() counterVector := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: metrics.ConcurrencyLimiterCounterTotalMetricName, Help: "A counter measuring the number of times Concurrency Limiter was triggered", }, metricLabelKeys) + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", concurrencyLimiterStatusRoot), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + concurrencyLimiterFactory := &concurrencyLimiterFactory{ - engineAPI: e, - distCache: distCache, - decisionsWatcher: decisionsWatcher, - agentGroupName: agentGroupName, - registry: reg, - counterVector: counterVector, + engineAPI: e, + distCache: distCache, + decisionsWatcher: decisionsWatcher, + agentGroupName: agentGroupName, + registry: reg, + counterVector: counterVector, + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } fxDriver, err := notifiers.NewFxDriver( @@ -130,6 +143,10 @@ func setupConcurrencyLimiterFactory( if err != nil { return err } + err = labelStatusJobGroup.Start() + if err != nil { + return err + } err = decisionsWatcher.Start() if err != nil { return err @@ -142,6 +159,10 @@ func setupConcurrencyLimiterFactory( if err != nil { merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } if !prometheusRegistry.Unregister(concurrencyLimiterFactory.counterVector) { err2 := fmt.Errorf("failed to unregister metric") merr = multierr.Append(merr, err2) @@ -159,22 +180,26 @@ func setupConcurrencyLimiterFactory( // per component fx app. func (clFactory *concurrencyLimiterFactory) newConcurrencyLimiterOptions(key notifiers.Key, unmarshaller config.Unmarshaller, reg status.Registry) (fx.Option, error) { logger := clFactory.registry.GetLogger() + wrapperMessage := &policysyncv1.ConcurrencyLimiterWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.ConcurrencyLimiter == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal concurrency limiter config") return fx.Options(), err } clProto := wrapperMessage.ConcurrencyLimiter cl := &concurrencyLimiter{ - Component: wrapperMessage.GetCommonAttributes(), - clProto: clProto, - clFactory: clFactory, - registry: reg, + Component: wrapperMessage.GetCommonAttributes(), + clProto: clProto, + clFactory: clFactory, + registry: reg, + labelStatusJobGroup: clFactory.labelStatusJobGroup, } cl.name = iface.ComponentKey(cl) + cl.tokensLabelKeyStatus = clFactory.labelStatusFactory.New("tokens_label_key", cl.GetPolicyName(), cl.GetComponentId()) + cl.limitByLabelKeyStatus = clFactory.labelStatusFactory.New("limit_by_label_key", cl.GetPolicyName(), cl.GetComponentId()) return fx.Options( fx.Invoke( @@ -186,11 +211,14 @@ func (clFactory *concurrencyLimiterFactory) newConcurrencyLimiterOptions(key not // concurrencyLimiter implements concurrency limiter on the data plane side. type concurrencyLimiter struct { iface.Component - registry status.Registry - clFactory *concurrencyLimiterFactory - limiter concurrencylimiter.ConcurrencyLimiter - clProto *policylangv1.ConcurrencyLimiter - name string + registry status.Registry + clFactory *concurrencyLimiterFactory + limiter concurrencylimiter.ConcurrencyLimiter + clProto *policylangv1.ConcurrencyLimiter + name string + labelStatusJobGroup *jobs.JobGroup + tokensLabelKeyStatus *labelstatus.LabelStatus + limitByLabelKeyStatus *labelstatus.LabelStatus } // Make sure concurrencyLimiter implements iface.Limiter. @@ -222,9 +250,14 @@ func (cl *concurrencyLimiter) setup(lifecycle fx.Lifecycle) error { metricLabels[metrics.PolicyHashLabel] = cl.GetPolicyHash() metricLabels[metrics.ComponentIDLabel] = cl.GetComponentId() + // setup labels status + cl.tokensLabelKeyStatus.Setup(cl.clFactory.labelStatusJobGroup, lifecycle) + cl.limitByLabelKeyStatus.Setup(cl.clFactory.labelStatusJobGroup, lifecycle) + lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { - // var err error + var err error + inner, err := concurrencylimiter.NewGlobalTokenCounter( cl.clFactory.distCache, cl.name, @@ -272,7 +305,7 @@ func (cl *concurrencyLimiter) setup(lifecycle fx.Lifecycle) error { if deleted == 0 { logger.Warn().Msg("Could not delete concurrency limiter counter from its metric vector. No traffic to generate metrics?") } - cl.registry.SetStatus(status.NewStatus(nil, merr)) + cl.registry.SetStatus(status.NewStatus(nil, merr), nil) return merr }, @@ -297,10 +330,13 @@ func (cl *concurrencyLimiter) Decide(ctx context.Context, labels labels.Labels) deniedResponseStatusCode = rParams.GetDeniedResponseStatusCode() tokensLabelKey := rParams.GetTokensLabelKey() if tokensLabelKey != "" { - if val, ok := labels.Get(tokensLabelKey); ok { + val, ok := labels.Get(tokensLabelKey) + if ok { if parsedTokens, err := strconv.ParseFloat(val, 64); err == nil { tokens = parsedTokens } + } else { + cl.tokensLabelKeyStatus.SetMissing() } } } @@ -367,8 +403,8 @@ func (cl *concurrencyLimiter) takeIfAvailable( return label, true, 0, 0, 0, "" } - label, err := cl.getLimitLabelFromLabels(labels) - if err != nil { + label, found := cl.getLimitLabelFromLabels(labels) + if !found { return label, true, 0, 0, 0, "" } @@ -426,17 +462,20 @@ func (cl *concurrencyLimiter) GetRampMode() bool { return false } -// getLabelFromLabels returns the label value from labels. -func (cl *concurrencyLimiter) getLimitLabelFromLabels(labels labels.Labels) (label string, err error) { +// getLabelFromLabels returns the label value from labels and bool indicating if the label was found. +func (cl *concurrencyLimiter) getLimitLabelFromLabels(labels labels.Labels) (label string, ok bool) { labelKey := cl.clProto.Parameters.GetLimitByLabelKey() if labelKey == "" { label = "default" - } else { - labelValue, found := labels.Get(labelKey) - if !found { - return "", errors.New("limit label not found") - } - label = labelKey + ":" + labelValue + return label, true + } + + labelValue, found := labels.Get(labelKey) + if !found { + cl.limitByLabelKeyStatus.SetMissing() + return "", false } - return label, nil + + label = labelKey + ":" + labelValue + return label, true } diff --git a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go index 5e3847edb9..dee41ed00a 100644 --- a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go @@ -168,7 +168,7 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions( wrapperMessage := &policysyncv1.ConcurrencySchedulerWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.ConcurrencyScheduler == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal concurrency scheduler config") return fx.Options(), err } @@ -312,7 +312,7 @@ func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error { merr = multierr.Append(merr, err) } - cs.registry.SetStatus(status.NewStatus(nil, merr)) + cs.registry.SetStatus(status.NewStatus(nil, merr), nil) return merr }, diff --git a/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go b/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go index fa11c58171..50ee28562a 100644 --- a/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go @@ -220,7 +220,7 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions( err := unmarshaller.Unmarshal(wrapperMessage) loadSchedulerProto := wrapperMessage.LoadScheduler if err != nil || loadSchedulerProto == nil { - registry.SetStatus(status.NewStatus(nil, err)) + registry.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal load scheduler config wrapper") return fx.Options(), err } @@ -295,7 +295,7 @@ func (ls *loadScheduler) setup(lifecycle fx.Lifecycle) error { lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { retErr := func(err error) error { - ls.registry.SetStatus(status.NewStatus(nil, err)) + ls.registry.SetStatus(status.NewStatus(nil, err), nil) return err } @@ -397,7 +397,7 @@ func (ls *loadScheduler) setup(lifecycle fx.Lifecycle) error { errMulti = multierr.Append(errMulti, errors.New("failed to delete "+metrics.TokenBucketAvailableMetricName+" gauge from its metric vector")) } - ls.registry.SetStatus(status.NewStatus(nil, errMulti)) + ls.registry.SetStatus(status.NewStatus(nil, errMulti), nil) return errMulti }, }) @@ -433,7 +433,7 @@ func (ls *loadScheduler) decisionUpdateCallback(event notifiers.Event, unmarshal if err != nil { statusMsg := "Failed to unmarshal config wrapper" logger.Warn().Err(err).Msg(statusMsg) - ls.registry.SetStatus(status.NewStatus(nil, err)) + ls.registry.SetStatus(status.NewStatus(nil, err), nil) return } @@ -441,7 +441,7 @@ func (ls *loadScheduler) decisionUpdateCallback(event notifiers.Event, unmarshal if loadDecision == nil { statusMsg := "load decision is nil" logger.Error().Msg(statusMsg) - ls.registry.SetStatus(status.NewStatus(nil, fmt.Errorf("failed to get load decision from LoadDecisionWrapper: %s", statusMsg))) + ls.registry.SetStatus(status.NewStatus(nil, fmt.Errorf("failed to get load decision from LoadDecisionWrapper: %s", statusMsg)), nil) return } @@ -449,7 +449,7 @@ func (ls *loadScheduler) decisionUpdateCallback(event notifiers.Event, unmarshal if commonAttributes == nil { statusMsg := "common attributes is nil" logger.Error().Msg(statusMsg) - ls.registry.SetStatus(status.NewStatus(nil, fmt.Errorf("failed to get common attributes from LoadDecisionWrapper: %s", statusMsg))) + ls.registry.SetStatus(status.NewStatus(nil, fmt.Errorf("failed to get common attributes from LoadDecisionWrapper: %s", statusMsg)), nil) return } @@ -458,7 +458,7 @@ func (ls *loadScheduler) decisionUpdateCallback(event notifiers.Event, unmarshal err = errors.New("policy id mismatch") statusMsg := fmt.Sprintf("Expected policy hash: %s, Got: %s", ls.GetPolicyHash(), commonAttributes.PolicyHash) logger.Warn().Err(err).Msg(statusMsg) - ls.registry.SetStatus(status.NewStatus(nil, err)) + ls.registry.SetStatus(status.NewStatus(nil, err), nil) return } diff --git a/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go b/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go index 60a956f050..368bc00aa5 100644 --- a/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go @@ -186,7 +186,7 @@ func (qsFactory *quotaSchedulerFactory) newQuotaSchedulerOptions( wrapperMessage := &policysyncv1.QuotaSchedulerWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.QuotaScheduler == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal quota scheduler config") return fx.Options(), err } @@ -350,7 +350,7 @@ func (qs *quotaScheduler) setup(lifecycle fx.Lifecycle) error { merr = multierr.Append(merr, err) } - qs.registry.SetStatus(status.NewStatus(nil, merr)) + qs.registry.SetStatus(status.NewStatus(nil, merr), nil) return merr }, diff --git a/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go b/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go index 5c33705b05..cad6a7731c 100644 --- a/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go +++ b/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go @@ -198,7 +198,7 @@ func (rlFactory *rateLimiterFactory) newRateLimiterOptions(key notifiers.Key, un wrapperMessage := &policysyncv1.RateLimiterWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.RateLimiter == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal rate limiter config") return fx.Options(), err } @@ -327,7 +327,7 @@ func (rl *rateLimiter) setup(lifecycle fx.Lifecycle) error { if deleted == 0 { logger.Warn().Msg("Could not delete rate limiter counter from its metric vector. No traffic to generate metrics?") } - rl.registry.SetStatus(status.NewStatus(nil, merr)) + rl.registry.SetStatus(status.NewStatus(nil, merr), nil) return merr }, diff --git a/pkg/policies/flowcontrol/actuators/sampler/sampler.go b/pkg/policies/flowcontrol/actuators/sampler/sampler.go index 8a390fe1cc..325f71ace3 100644 --- a/pkg/policies/flowcontrol/actuators/sampler/sampler.go +++ b/pkg/policies/flowcontrol/actuators/sampler/sampler.go @@ -169,7 +169,7 @@ func (frf *samplerFactory) newSamplerOptions( wrapperMessage := &policysyncv1.SamplerWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.Sampler == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal sampler config") return fx.Options(), err } @@ -269,7 +269,7 @@ func (fr *sampler) setup(lifecycle fx.Lifecycle) error { if deleted == 0 { logger.Warn().Msg("Could not delete sampler counter from its metric vector. No traffic to generate metrics?") } - fr.registry.SetStatus(status.NewStatus(nil, merr)) + fr.registry.SetStatus(status.NewStatus(nil, merr), nil) return merr }, diff --git a/pkg/policies/flowcontrol/resources/fluxmeter/flux-meter.go b/pkg/policies/flowcontrol/resources/fluxmeter/flux-meter.go index 4417a95b3f..bf45515245 100644 --- a/pkg/policies/flowcontrol/resources/fluxmeter/flux-meter.go +++ b/pkg/policies/flowcontrol/resources/fluxmeter/flux-meter.go @@ -120,7 +120,7 @@ func (fluxMeterFactory *fluxMeterFactory) newFluxMeterOptions( wrapperMessage := &policysyncv1.FluxMeterWrapper{} err := unmarshaller.Unmarshal(wrapperMessage) if err != nil || wrapperMessage.FluxMeter == nil { - reg.SetStatus(status.NewStatus(nil, err)) + reg.SetStatus(status.NewStatus(nil, err), nil) logger.Warn().Err(err).Msg("Failed to unmarshal flux meter config wrapper") return fx.Options(), err } diff --git a/pkg/status/registry.go b/pkg/status/registry.go index e60769fbd7..a8245d8154 100644 --- a/pkg/status/registry.go +++ b/pkg/status/registry.go @@ -15,7 +15,7 @@ import ( // Registry . type Registry interface { GetStatus() *statusv1.Status - SetStatus(*statusv1.Status) + SetStatus(*statusv1.Status, map[string]string) SetGroupStatus(*statusv1.GroupStatus) GetGroupStatus() *statusv1.GroupStatus Child(key, value string) Registry @@ -177,27 +177,43 @@ func (r *registry) GetStatus() *statusv1.Status { } // SetStatus sets the status of the Registry. -func (r *registry) SetStatus(status *statusv1.Status) { +func (r *registry) SetStatus(status *statusv1.Status, labels map[string]string) { r.mu.Lock() defer r.mu.Unlock() r.status = status if r.status != nil && r.status.Error != nil { - r.alerter.AddAlert(r.createAlert(r.status.Error)) + r.alerter.AddAlert(r.createAlert(r.status.Error, labels)) } } -func (r *registry) createAlert(err *statusv1.Status_Error) *alerts.Alert { +func (r *registry) createAlert(err *statusv1.Status_Error, labels map[string]string) *alerts.Alert { resolve := time.Duration(time.Second * alertResolveTimeout) - newAlert := alerts.NewAlert( + + severity := alerts.SeverityInfo.String() + if labels != nil { + s, ok := labels["severity"] + if ok { + severity = s + delete(labels, "severity") + } + } + + alertOpts := []alerts.AlertOption{ alerts.WithName(err.String()), - alerts.WithSeverity(alerts.ParseSeverity("info")), + alerts.WithSeverity(alerts.ParseSeverity(severity)), alerts.WithAlertChannels([]string{alertChannel}), alerts.WithResolveTimeout(resolve), alerts.WithGeneratorURL( fmt.Sprintf("http://%s%s", info.GetHostInfo().Hostname, r.uri), ), - ) + } + + for k, v := range labels { + alertOpts = append(alertOpts, alerts.WithLabel(k, v)) + } + + newAlert := alerts.NewAlert(alertOpts...) return newAlert } diff --git a/pkg/status/status_test.go b/pkg/status/status_test.go index d79c02c51f..427f2a4155 100644 --- a/pkg/status/status_test.go +++ b/pkg/status/status_test.go @@ -35,10 +35,10 @@ var _ = Describe("Status Registry", func() { }) It("returns updated status information", func() { test_status := NewStatus(nil, errors.New("test status")) - rootRegistry.SetStatus(test_status) + rootRegistry.SetStatus(test_status, nil) Expect(rootRegistry.GetStatus()).To(Equal(test_status)) Expect(rootRegistry.HasError()).To(BeTrue()) - rootRegistry.SetStatus(nil) + rootRegistry.SetStatus(nil, nil) Expect(rootRegistry.GetStatus()).To(Equal(&statusv1.Status{})) Expect(rootRegistry.HasError()).To(BeFalse()) @@ -98,7 +98,7 @@ var _ = Describe("Status Registry", func() { }) It("returns updated status information", func() { test_status1 := NewStatus(nil, errors.New("")) - rootRegistry.SetStatus(test_status1) + rootRegistry.SetStatus(test_status1, nil) Expect(rootRegistry.GetStatus()).To(Equal(test_status1)) Expect(rootRegistry.HasError()).To(BeTrue()) @@ -107,7 +107,7 @@ var _ = Describe("Status Registry", func() { Status: test_status2, Groups: make(map[string]*statusv1.GroupStatus), } - child1.SetStatus(test_status1) + child1.SetStatus(test_status1, nil) child1.SetGroupStatus(test_groupstatus1) Expect(child1.HasError()).To(BeFalse()) Expect(child1.GetStatus()).To(Equal(test_status2)) @@ -128,7 +128,7 @@ var _ = Describe("Status Registry", func() { rootRegistry.SetGroupStatus(test_groupstatus2) rootGroupStatus := rootRegistry.GetGroupStatus().Status Expect(rootGroupStatus).To(Equal(test_status3)) - rootRegistry.SetStatus(nil) + rootRegistry.SetStatus(nil, nil) Expect(rootRegistry.HasError()).To(BeTrue()) }) It("creates multiple child registries then detaches them", func() { diff --git a/pkg/watchdog/watchdog.go b/pkg/watchdog/watchdog.go index 15b162266c..d22aea6ae2 100644 --- a/pkg/watchdog/watchdog.go +++ b/pkg/watchdog/watchdog.go @@ -175,7 +175,7 @@ func (w *watchdog) start() error { if e != nil { log.Autosample().Warn().Err(e).Msg("Heap check failed") } - w.heapStatusRegistry.SetStatus(status.NewStatus(details, nil)) + w.heapStatusRegistry.SetStatus(status.NewStatus(details, nil), nil) } case <-w.sentinel.ctx.Done(): return diff --git a/test/aperture_suite_test.go b/test/aperture_suite_test.go index 2c14688b6f..0cee34c10a 100644 --- a/test/aperture_suite_test.go +++ b/test/aperture_suite_test.go @@ -196,7 +196,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) etcdWatcher.Start() - registry.Child("system", "readiness").Child("component", "platform").SetStatus(status.NewStatus(wrapperspb.String("platform running"), nil)) + registry.Child("system", "readiness").Child("component", "platform").SetStatus(status.NewStatus(wrapperspb.String("platform running"), nil), nil) project = "staging" Eventually(func() bool { @@ -206,7 +206,7 @@ var _ = BeforeSuite(func() { }) var _ = AfterSuite(func() { - registry.Child("system", "readiness").Child("component", "platform").SetStatus(status.NewStatus(nil, errors.New("platform stopping"))) + registry.Child("system", "readiness").Child("component", "platform").SetStatus(status.NewStatus(nil, errors.New("platform stopping")), nil) stopCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() From d16d28c03c242ac1fffd51e6f2594a045bbcd6f5 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Tue, 23 Jan 2024 21:44:58 -0800 Subject: [PATCH 2/8] Add support for label alerts in rate limiter --- .../actuators/rate-limiter/rate-limiter.go | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go b/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go index cad6a7731c..b1bd30af71 100644 --- a/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go +++ b/pkg/policies/flowcontrol/actuators/rate-limiter/rate-limiter.go @@ -25,6 +25,7 @@ import ( etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" @@ -82,6 +83,8 @@ type rateLimiterFactory struct { decisionsWatcher notifiers.Watcher counterVector *prometheus.CounterVec agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // main fx app. @@ -94,6 +97,7 @@ func setupRateLimiterFactory( prometheusRegistry *prometheus.Registry, etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { agentGroupName := ai.GetAgentGroup() etcdPath := path.Join(paths.RateLimiterDecisionsPath) @@ -118,6 +122,12 @@ func setupRateLimiterFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", rateLimiterStatusRoot), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + counterVector := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: metrics.RateLimiterCounterTotalMetricName, Help: "A counter measuring the number of times Rate Limiter was triggered", @@ -132,6 +142,8 @@ func setupRateLimiterFactory( agentGroupName: agentGroupName, registry: reg, counterVector: counterVector, + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } fxDriver, err := notifiers.NewFxDriver( @@ -158,6 +170,10 @@ func setupRateLimiterFactory( if err != nil { return err } + err = labelStatusJobGroup.Start() + if err != nil { + return err + } err = decisionsWatcher.Start() if err != nil { return err @@ -170,6 +186,10 @@ func setupRateLimiterFactory( if err != nil { merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } err = rateLimiterJobGroup.Stop() if err != nil { merr = multierr.Append(merr, err) @@ -205,12 +225,15 @@ func (rlFactory *rateLimiterFactory) newRateLimiterOptions(key notifiers.Key, un rlProto := wrapperMessage.RateLimiter rl := &rateLimiter{ - Component: wrapperMessage.GetCommonAttributes(), - rlProto: rlProto, - rlFactory: rlFactory, - registry: reg, + Component: wrapperMessage.GetCommonAttributes(), + rlProto: rlProto, + rlFactory: rlFactory, + registry: reg, + labelStatusJobGroup: rlFactory.labelStatusJobGroup, } rl.name = iface.ComponentKey(rl) + rl.tokensLabelKeyStatus = rlFactory.labelStatusFactory.New("tokens_label_key", rl.GetPolicyName(), rl.GetComponentId()) + rl.limitByLabelKeyStatus = rlFactory.labelStatusFactory.New("limit_by_label_key", rl.GetPolicyName(), rl.GetComponentId()) return fx.Options( fx.Invoke( @@ -222,12 +245,15 @@ func (rlFactory *rateLimiterFactory) newRateLimiterOptions(key notifiers.Key, un // rateLimiter implements rate limiter on the data plane side. type rateLimiter struct { iface.Component - registry status.Registry - rlFactory *rateLimiterFactory - limiter ratelimiter.RateLimiter - inner *globaltokenbucket.GlobalTokenBucket - rlProto *policylangv1.RateLimiter - name string + registry status.Registry + rlFactory *rateLimiterFactory + limiter ratelimiter.RateLimiter + inner *globaltokenbucket.GlobalTokenBucket + rlProto *policylangv1.RateLimiter + name string + labelStatusJobGroup *jobs.JobGroup + tokensLabelKeyStatus *labelstatus.LabelStatus + limitByLabelKeyStatus *labelstatus.LabelStatus } // Make sure rateLimiter implements iface.Limiter. @@ -259,6 +285,10 @@ func (rl *rateLimiter) setup(lifecycle fx.Lifecycle) error { metricLabels[metrics.PolicyHashLabel] = rl.GetPolicyHash() metricLabels[metrics.ComponentIDLabel] = rl.GetComponentId() + // setup labels status + rl.tokensLabelKeyStatus.Setup(rl.rlFactory.labelStatusJobGroup, lifecycle) + rl.limitByLabelKeyStatus.Setup(rl.rlFactory.labelStatusJobGroup, lifecycle) + lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { var err error @@ -352,10 +382,13 @@ func (rl *rateLimiter) Decide(ctx context.Context, labels labels.Labels) *flowco deniedResponseStatusCode = rParams.GetDeniedResponseStatusCode() tokensLabelKey := rParams.GetTokensLabelKey() if tokensLabelKey != "" { - if val, ok := labels.Get(tokensLabelKey); ok { + val, ok := labels.Get(tokensLabelKey) + if ok { if parsedTokens, err := strconv.ParseFloat(val, 64); err == nil { tokens = parsedTokens } + } else { + rl.tokensLabelKeyStatus.SetMissing() } } } @@ -426,6 +459,7 @@ func (rl *rateLimiter) takeIfAvailable( } else { labelValue, found := labels.Get(labelKey) if !found { + rl.limitByLabelKeyStatus.SetMissing() return "", true, 0, 0, 0 } label = labelKey + ":" + labelValue From 2828179fb2f87676277249044e5700e77086504b Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Fri, 26 Jan 2024 15:53:13 -0800 Subject: [PATCH 3/8] Add label alerts in other limiters - concurrency-scheduler - load-scheduler - quota-scheduler - scheduler --- .../concurrency-scheduler.go | 96 +++++++++++++------ .../load-scheduler/load-scheduler.go | 53 ++++++++-- .../quota-scheduler/quota-scheduler.go | 61 +++++++++--- .../actuators/workload-scheduler/scheduler.go | 47 ++++++--- 4 files changed, 197 insertions(+), 60 deletions(-) diff --git a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go index dee41ed00a..0aae11e56a 100644 --- a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go @@ -23,6 +23,7 @@ import ( etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" @@ -71,13 +72,15 @@ func provideConcurrencySchedulerWatchers( } type concurrencySchedulerFactory struct { - engineAPI iface.Engine - registry status.Registry - decisionsWatcher notifiers.Watcher - distCache *distcache.DistCache - auditJobGroup *jobs.JobGroup - wsFactory *workloadscheduler.Factory - agentGroupName string + engineAPI iface.Engine + registry status.Registry + decisionsWatcher notifiers.Watcher + distCache *distcache.DistCache + auditJobGroup *jobs.JobGroup + wsFactory *workloadscheduler.Factory + agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // main fx app. @@ -91,6 +94,7 @@ func setupConcurrencySchedulerFactory( etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, wsFactory *workloadscheduler.Factory, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { agentGroupName := ai.GetAgentGroup() etcdPath := path.Join(paths.ConcurrencySchedulerDecisionsPath) @@ -109,14 +113,22 @@ func setupConcurrencySchedulerFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", concurrencySchedulerStatusRoot), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + concurrencySchedulerFactory := &concurrencySchedulerFactory{ - engineAPI: e, - distCache: distCache, - auditJobGroup: auditJobGroup, - decisionsWatcher: decisionsWatcher, - agentGroupName: agentGroupName, - registry: reg, - wsFactory: wsFactory, + engineAPI: e, + distCache: distCache, + auditJobGroup: auditJobGroup, + decisionsWatcher: decisionsWatcher, + agentGroupName: agentGroupName, + registry: reg, + wsFactory: wsFactory, + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } fxDriver, err := notifiers.NewFxDriver(reg, prometheusRegistry, @@ -132,6 +144,10 @@ func setupConcurrencySchedulerFactory( if err != nil { return err } + err = labelStatusJobGroup.Start() + if err != nil { + return err + } err = decisionsWatcher.Start() if err != nil { return err @@ -144,6 +160,10 @@ func setupConcurrencySchedulerFactory( if err != nil { merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } err = auditJobGroup.Stop() if err != nil { merr = multierr.Append(merr, err) @@ -181,13 +201,19 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions( } cs := &concurrencyScheduler{ - Component: wrapperMessage.GetCommonAttributes(), - proto: csProto, - csFactory: csFactory, - registry: reg, - clock: clockwork.NewRealClock(), + Component: wrapperMessage.GetCommonAttributes(), + proto: csProto, + csFactory: csFactory, + registry: reg, + clock: clockwork.NewRealClock(), + labelStatusJobGroup: csFactory.labelStatusJobGroup, } cs.name = iface.ComponentKey(cs) + cs.limitByLabelKeyStatus = csFactory.labelStatusFactory.New("limit_by_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.tokensLabelKeyStatus = csFactory.labelStatusFactory.New("tokens_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.priorityLabelKeyStatus = csFactory.labelStatusFactory.New("priority_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.workloadLabelKeyStatus = csFactory.labelStatusFactory.New("workload_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.fairnessLabelKeyStatus = csFactory.labelStatusFactory.New("fairness_label_key", cs.GetPolicyName(), cs.GetComponentId()) return fx.Options( fx.Invoke( @@ -200,13 +226,19 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions( type concurrencyScheduler struct { schedulers sync.Map iface.Component - registry status.Registry - clock clockwork.Clock - csFactory *concurrencySchedulerFactory - limiter concurrencylimiter.ConcurrencyLimiter - proto *policylangv1.ConcurrencyScheduler - schedulerMetrics *workloadscheduler.SchedulerMetrics - name string + registry status.Registry + clock clockwork.Clock + csFactory *concurrencySchedulerFactory + limiter concurrencylimiter.ConcurrencyLimiter + proto *policylangv1.ConcurrencyScheduler + schedulerMetrics *workloadscheduler.SchedulerMetrics + name string + labelStatusJobGroup *jobs.JobGroup + limitByLabelKeyStatus *labelstatus.LabelStatus + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error { @@ -233,6 +265,12 @@ func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error { metricLabels[metrics.PolicyHashLabel] = cs.GetPolicyHash() metricLabels[metrics.ComponentIDLabel] = cs.GetComponentId() + cs.limitByLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.tokensLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.priorityLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.workloadLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.fairnessLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { var err error @@ -333,6 +371,7 @@ func (cs *concurrencyScheduler) getLabelKey(labels labels.Labels) (string, bool) } else { labelValue, found := labels.Get(labelKey) if !found { + cs.limitByLabelKeyStatus.SetMissing() return "", false } label = labelKey + ":" + labelValue @@ -376,7 +415,6 @@ func (cs *concurrencyScheduler) Decide(ctx context.Context, labels labels.Labels var found bool label, found = cs.getLabelKey(labels) - if !found { reason = flowcontrolv1.LimiterDecision_LIMITER_REASON_KEY_NOT_FOUND return returnDecision() @@ -393,6 +431,10 @@ func (cs *concurrencyScheduler) Decide(ctx context.Context, labels labels.Labels cs, tokenCounter, cs.schedulerMetrics, + cs.tokensLabelKeyStatus, + cs.priorityLabelKeyStatus, + cs.workloadLabelKeyStatus, + cs.fairnessLabelKeyStatus, ) if err != nil { log.Error().Err(err).Msg("Failed to create scheduler") diff --git a/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go b/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go index 50ee28562a..f67a589654 100644 --- a/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go @@ -19,7 +19,9 @@ import ( "github.com/fluxninja/aperture/v2/pkg/config" etcdclient "github.com/fluxninja/aperture/v2/pkg/etcd/client" etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" + "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" workloadscheduler "github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/actuators/workload-scheduler" @@ -78,6 +80,8 @@ type loadSchedulerFactory struct { tokenBucketAvailableTokensGaugeVec *prometheus.GaugeVec wsFactory *workloadscheduler.Factory agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // setupLoadSchedulerFactory sets up the load scheduler module in the main fx app. @@ -90,9 +94,12 @@ func setupLoadSchedulerFactory( etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, wsFactory *workloadscheduler.Factory, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { reg := registry.Child("component", "load_scheduler") + logger := reg.GetLogger() + agentGroup := ai.GetAgentGroup() // Scope the sync to the agent group. @@ -102,12 +109,20 @@ func setupLoadSchedulerFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", "load_scheduler"), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + lsFactory := &loadSchedulerFactory{ engineAPI: e, registry: reg, wsFactory: wsFactory, loadDecisionWatcher: loadDecisionWatcher, agentGroupName: ai.GetAgentGroup(), + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } // Initialize and register the WFQ and Token Bucket Metric Vectors @@ -169,6 +184,11 @@ func setupLoadSchedulerFactory( return err } + err = labelStatusJobGroup.Start() + if err != nil { + return err + } + err = lsFactory.loadDecisionWatcher.Start() if err != nil { return err @@ -184,6 +204,11 @@ func setupLoadSchedulerFactory( merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } + if !prometheusRegistry.Unregister(lsFactory.tokenBucketLMGaugeVec) { err := fmt.Errorf("failed to unregister " + metrics.TokenBucketLMMetricName) merr = multierr.Append(merr, err) @@ -236,7 +261,12 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions( registry: registry, loadSchedulerFactory: lsFactory, clock: clockwork.NewRealClock(), + labelStatusJobGroup: lsFactory.labelStatusJobGroup, } + ls.tokensLabelKeyStatus = lsFactory.labelStatusFactory.New("tokens_label_key", ls.GetPolicyName(), ls.GetComponentId()) + ls.priorityLabelKeyStatus = lsFactory.labelStatusFactory.New("priority_label_key", ls.GetPolicyName(), ls.GetComponentId()) + ls.workloadLabelKeyStatus = lsFactory.labelStatusFactory.New("workload_label_key", ls.GetPolicyName(), ls.GetComponentId()) + ls.fairnessLabelKeyStatus = lsFactory.labelStatusFactory.New("fairness_label_key", ls.GetPolicyName(), ls.GetComponentId()) return fx.Options( fx.Invoke( @@ -249,13 +279,18 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions( type loadScheduler struct { // TODO: comment to self: why do we depend on Scheduler to implement Decide and Revert in this case? iface.Component - scheduler *workloadscheduler.Scheduler - registry status.Registry - proto *policylangv1.LoadScheduler - loadSchedulerFactory *loadSchedulerFactory - clock clockwork.Clock - tokenBucket *scheduler.LoadMultiplierTokenBucket - schedulerMetrics *workloadscheduler.SchedulerMetrics + scheduler *workloadscheduler.Scheduler + registry status.Registry + proto *policylangv1.LoadScheduler + loadSchedulerFactory *loadSchedulerFactory + clock clockwork.Clock + tokenBucket *scheduler.LoadMultiplierTokenBucket + schedulerMetrics *workloadscheduler.SchedulerMetrics + labelStatusJobGroup *jobs.JobGroup + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } // Make sure LoadScheduler implements the iface.LoadScheduler. @@ -342,6 +377,10 @@ func (ls *loadScheduler) setup(lifecycle fx.Lifecycle) error { ls, ls.tokenBucket, ls.schedulerMetrics, + ls.tokensLabelKeyStatus, + ls.priorityLabelKeyStatus, + ls.workloadLabelKeyStatus, + ls.fairnessLabelKeyStatus, ) if err != nil { return retErr(err) diff --git a/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go b/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go index 368bc00aa5..ac5afc093b 100644 --- a/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go @@ -25,6 +25,7 @@ import ( etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" @@ -81,6 +82,8 @@ type quotaSchedulerFactory struct { rateLimiterJobGroup *jobs.JobGroup wsFactory *workloadscheduler.Factory agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // main fx app. @@ -94,6 +97,7 @@ func setupQuotaSchedulerFactory( etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, wsFactory *workloadscheduler.Factory, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { agentGroupName := ai.GetAgentGroup() etcdPath := path.Join(paths.QuotaSchedulerDecisionsPath) @@ -118,6 +122,12 @@ func setupQuotaSchedulerFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", "load_scheduler"), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + quotaSchedulerFactory := "aSchedulerFactory{ engineAPI: e, distCache: distCache, @@ -127,6 +137,8 @@ func setupQuotaSchedulerFactory( agentGroupName: agentGroupName, registry: reg, wsFactory: wsFactory, + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } fxDriver, err := notifiers.NewFxDriver(reg, prometheusRegistry, @@ -146,6 +158,11 @@ func setupQuotaSchedulerFactory( if err != nil { return err } + + err = labelStatusJobGroup.Start() + if err != nil { + return err + } err = decisionsWatcher.Start() if err != nil { return err @@ -158,6 +175,10 @@ func setupQuotaSchedulerFactory( if err != nil { merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } err = rateLimiterJobGroup.Stop() if err != nil { merr = multierr.Append(merr, err) @@ -199,13 +220,18 @@ func (qsFactory *quotaSchedulerFactory) newQuotaSchedulerOptions( } qs := "aScheduler{ - Component: wrapperMessage.GetCommonAttributes(), - proto: qsProto, - qsFactory: qsFactory, - registry: reg, - clock: clockwork.NewRealClock(), + Component: wrapperMessage.GetCommonAttributes(), + proto: qsProto, + qsFactory: qsFactory, + registry: reg, + clock: clockwork.NewRealClock(), + labelStatusJobGroup: qsFactory.labelStatusJobGroup, } qs.name = iface.ComponentKey(qs) + qs.tokensLabelKeyStatus = qsFactory.labelStatusFactory.New("tokens_label_key", qs.GetPolicyName(), qs.GetComponentId()) + qs.priorityLabelKeyStatus = qsFactory.labelStatusFactory.New("priority_label_key", qs.GetPolicyName(), qs.GetComponentId()) + qs.workloadLabelKeyStatus = qsFactory.labelStatusFactory.New("workload_label_key", qs.GetPolicyName(), qs.GetComponentId()) + qs.fairnessLabelKeyStatus = qsFactory.labelStatusFactory.New("fairness_label_key", qs.GetPolicyName(), qs.GetComponentId()) return fx.Options( fx.Invoke( @@ -218,14 +244,19 @@ func (qsFactory *quotaSchedulerFactory) newQuotaSchedulerOptions( type quotaScheduler struct { schedulers sync.Map iface.Component - registry status.Registry - limiter ratelimiter.RateLimiter - clock clockwork.Clock - qsFactory *quotaSchedulerFactory - inner *globaltokenbucket.GlobalTokenBucket - proto *policylangv1.QuotaScheduler - schedulerMetrics *workloadscheduler.SchedulerMetrics - name string + registry status.Registry + limiter ratelimiter.RateLimiter + clock clockwork.Clock + qsFactory *quotaSchedulerFactory + inner *globaltokenbucket.GlobalTokenBucket + proto *policylangv1.QuotaScheduler + schedulerMetrics *workloadscheduler.SchedulerMetrics + name string + labelStatusJobGroup *jobs.JobGroup + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } func (qs *quotaScheduler) setup(lifecycle fx.Lifecycle) error { @@ -433,6 +464,10 @@ func (qs *quotaScheduler) Decide(ctx context.Context, labels labels.Labels) *flo qs, tokenBucket, qs.schedulerMetrics, + qs.tokensLabelKeyStatus, + qs.priorityLabelKeyStatus, + qs.workloadLabelKeyStatus, + qs.fairnessLabelKeyStatus, ) if err != nil { log.Error().Err(err).Msg("Failed to create scheduler") diff --git a/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go b/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go index bb87936be0..b8aaf76014 100644 --- a/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go +++ b/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go @@ -18,6 +18,7 @@ import ( policylangv1 "github.com/fluxninja/aperture/api/v2/gen/proto/go/aperture/policy/language/v1" "github.com/fluxninja/aperture/v2/pkg/config" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" multimatcher "github.com/fluxninja/aperture/v2/pkg/multi-matcher" @@ -431,15 +432,19 @@ func (sm *SchedulerMetrics) appendWorkloadLabel(workloadLabel string) prometheus // Scheduler implements load scheduler on the flowcontrol side. type Scheduler struct { - component iface.Component - scheduler scheduler.Scheduler - registry status.Registry - proto *policylangv1.Scheduler - defaultWorkload *workload - workloadMultiMatcher *multiMatcher - tokensByWorkloadIndex map[string]float64 - metrics *SchedulerMetrics - mutex sync.RWMutex + component iface.Component + scheduler scheduler.Scheduler + registry status.Registry + proto *policylangv1.Scheduler + defaultWorkload *workload + workloadMultiMatcher *multiMatcher + tokensByWorkloadIndex map[string]float64 + metrics *SchedulerMetrics + mutex sync.RWMutex + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } // NewScheduler returns fx options for the load scheduler fx app. @@ -450,6 +455,10 @@ func (wsFactory *Factory) NewScheduler( component iface.Component, tokenManger scheduler.TokenManager, schedulerMetrics *SchedulerMetrics, + tokensLabelKeyStatus *labelstatus.LabelStatus, + priorityLabelKeyStatus *labelstatus.LabelStatus, + workloadLabelKeyStatus *labelstatus.LabelStatus, + fairnessLabelKeyStatus *labelstatus.LabelStatus, ) (*Scheduler, error) { initPreemptMetrics := func(workloadLabel string) error { if schedulerMetrics == nil { @@ -579,8 +588,11 @@ func (s *Scheduler) Decide(ctx context.Context, labels labels.Labels) (*flowcont } if s.proto.WorkloadLabelKey != "" { - if val, ok := labels.Get(s.proto.WorkloadLabelKey); ok { + val, ok := labels.Get(s.proto.WorkloadLabelKey) + if ok { matchedWorkloadLabel = val + } else { + s.workloadLabelKeyStatus.SetMissing() } } @@ -589,21 +601,27 @@ func (s *Scheduler) Decide(ctx context.Context, labels labels.Labels) (*flowcont } if s.proto.TokensLabelKey != "" { - if val, ok := labels.Get(s.proto.TokensLabelKey); ok { + val, ok := labels.Get(s.proto.TokensLabelKey) + if ok { if parsedTokens, err := strconv.ParseFloat(val, 64); err == nil { tokens = parsedTokens + } else { + s.tokensLabelKeyStatus.SetMissing() } } } if s.proto.PriorityLabelKey != "" { - if val, ok := labels.Get(s.proto.PriorityLabelKey); ok { + val, ok := labels.Get(s.proto.PriorityLabelKey) + if ok { if parsedPriority, err := strconv.ParseFloat(val, 64); err == nil { if parsedPriority > 0 { priority = parsedPriority invPriority = 1 / parsedPriority } } + } else { + s.priorityLabelKeyStatus.SetMissing() } } @@ -648,8 +666,11 @@ func (s *Scheduler) Decide(ctx context.Context, labels labels.Labels) (*flowcont var fairnessLabel string if s.proto.FairnessLabelKey != "" { - if val, ok := labels.Get(s.proto.FairnessLabelKey); ok { + val, ok := labels.Get(s.proto.FairnessLabelKey) + if ok { fairnessLabel = val + } else { + s.fairnessLabelKeyStatus.SetMissing() } } From 840671be1fbec52c4fa6cf183e7df4926aa45325 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Fri, 26 Jan 2024 15:58:22 -0800 Subject: [PATCH 4/8] Remove severity label if timestamp is older than 5 minutes --- pkg/labelstatus/labelstatus.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/labelstatus/labelstatus.go b/pkg/labelstatus/labelstatus.go index 3b76591eef..4ad0122ecd 100644 --- a/pkg/labelstatus/labelstatus.go +++ b/pkg/labelstatus/labelstatus.go @@ -99,12 +99,11 @@ func (ls *LabelStatus) setLookupStatus(ctx context.Context) (proto.Message, erro } if time.Since(ls.timestamp) >= 5*time.Minute { - labels["severity"] = alerts.SeverityInfo.String() ls.registry.SetStatus(nil, labels) return nil, nil } else { labels["severity"] = alerts.SeverityCrit.String() - s := status.NewStatus(nil, errors.New("label "+ls.labelKey+"missing")) + s := status.NewStatus(nil, errors.New("label "+ls.labelKey+" missing")) ls.registry.SetStatus(s, labels) } From 8b5d4abe65799f7a0e0b51b0034c7a2ebac19f53 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Fri, 26 Jan 2024 16:11:28 -0800 Subject: [PATCH 5/8] Update lock to use RLock and RUnlock in setLookupStatus function --- pkg/labelstatus/labelstatus.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/labelstatus/labelstatus.go b/pkg/labelstatus/labelstatus.go index 4ad0122ecd..d385eb3af0 100644 --- a/pkg/labelstatus/labelstatus.go +++ b/pkg/labelstatus/labelstatus.go @@ -86,8 +86,8 @@ func (ls *LabelStatus) SetMissing() { } func (ls *LabelStatus) setLookupStatus(ctx context.Context) (proto.Message, error) { - ls.lock.Lock() - defer ls.lock.Unlock() + ls.lock.RLock() + defer ls.lock.RUnlock() if ls.timestamp.IsZero() { return nil, nil From 88dd526c0b3b932bee42ca2e9224b0a169d1989b Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Sun, 28 Jan 2024 20:30:05 -0800 Subject: [PATCH 6/8] Refactor label status and concurrency scheduler modules --- pkg/labelstatus/labelstatus.go | 18 +++++++++--------- .../concurrency-scheduler.go | 10 ++++++---- pkg/policies/flowcontrol/provide.go | 2 ++ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pkg/labelstatus/labelstatus.go b/pkg/labelstatus/labelstatus.go index d385eb3af0..9a3cabef01 100644 --- a/pkg/labelstatus/labelstatus.go +++ b/pkg/labelstatus/labelstatus.go @@ -15,18 +15,18 @@ import ( "github.com/fluxninja/aperture/v2/pkg/status" ) -// LabelStatusFactory is a factory for creating LabelStatus. -type LabelStatusFactory struct { - registry status.Registry -} - -// LabelStatusModule is an fx module for providing LabelStatusFactory. -func LabelStatusModule() fx.Option { +// Module is an fx module for providing LabelStatusFactory. +func Module() fx.Option { return fx.Options( fx.Provide(NewLabelStatusFactory), ) } +// LabelStatusFactory is a factory for creating LabelStatus. +type LabelStatusFactory struct { + registry status.Registry +} + // NewLabelStatusFactory creates a new LabelStatusFactory. func NewLabelStatusFactory(statusRegistry status.Registry) *LabelStatusFactory { return &LabelStatusFactory{ @@ -59,7 +59,7 @@ type LabelStatus struct { func (ls *LabelStatus) Setup(jobGroup *jobs.JobGroup, lifecycle fx.Lifecycle) { lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { - job := jobs.NewBasicJob("", ls.setLookupStatus) + job := jobs.NewBasicJob("label-status", ls.setLookupStatus) err := jobGroup.RegisterJob(job, jobs.JobConfig{ ExecutionPeriod: config.MakeDuration(10 * time.Second), }) @@ -69,7 +69,7 @@ func (ls *LabelStatus) Setup(jobGroup *jobs.JobGroup, lifecycle fx.Lifecycle) { return nil }, OnStop: func(context.Context) error { - err := jobGroup.DeregisterJob("") + err := jobGroup.DeregisterJob("label-status") if err != nil { return err } diff --git a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go index 0aae11e56a..b8b24b0873 100644 --- a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go @@ -115,7 +115,7 @@ func setupConcurrencySchedulerFactory( labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", concurrencySchedulerStatusRoot), jobs.JobGroupConfig{}, nil) if err != nil { - logger.Error().Err(err).Msg("Failed to create labels status job group") + logger.Error().Err(err).Msg("Failed to create label_status job group") return err } @@ -243,10 +243,12 @@ type concurrencyScheduler struct { func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error { logger := cs.registry.GetLogger() - etcdKey := paths.AgentComponentKey(cs.csFactory.agentGroupName, + + etcdKey := paths.AgentComponentKey( + cs.csFactory.agentGroupName, cs.GetPolicyName(), - cs.GetComponentId()) - // decision notifier + cs.GetComponentId(), + ) decisionUnmarshaller, err := config.NewProtobufUnmarshaller(nil) if err != nil { return err diff --git a/pkg/policies/flowcontrol/provide.go b/pkg/policies/flowcontrol/provide.go index e3c702c5c2..ae69ab7d14 100644 --- a/pkg/policies/flowcontrol/provide.go +++ b/pkg/policies/flowcontrol/provide.go @@ -4,6 +4,7 @@ import ( "go.uber.org/fx" agentinfo "github.com/fluxninja/aperture/v2/pkg/agent-info" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/actuators" "github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/iface" "github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/resources/classifier" @@ -22,6 +23,7 @@ func Module() fx.Option { servicegetter.Module, EngineModule(), CacheModule(), + labelstatus.Module(), ) } From 83e6eb1dc6e461392069187179e05ae1a6fbc17b Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Sun, 28 Jan 2024 21:01:11 -0800 Subject: [PATCH 7/8] Add job name to label status setup --- pkg/labelstatus/labelstatus.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/labelstatus/labelstatus.go b/pkg/labelstatus/labelstatus.go index 9a3cabef01..cf9f1e39d0 100644 --- a/pkg/labelstatus/labelstatus.go +++ b/pkg/labelstatus/labelstatus.go @@ -3,6 +3,7 @@ package labelstatus import ( "context" "errors" + "fmt" "sync" "time" @@ -57,9 +58,10 @@ type LabelStatus struct { // Setup sets up the LabelsStatus's lifecycle hooks. func (ls *LabelStatus) Setup(jobGroup *jobs.JobGroup, lifecycle fx.Lifecycle) { + jobName := fmt.Sprintf("label-status-%s-%s-%s", ls.policyName, ls.componentID, ls.labelKey) lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { - job := jobs.NewBasicJob("label-status", ls.setLookupStatus) + job := jobs.NewBasicJob(jobName, ls.setLookupStatus) err := jobGroup.RegisterJob(job, jobs.JobConfig{ ExecutionPeriod: config.MakeDuration(10 * time.Second), }) @@ -69,7 +71,7 @@ func (ls *LabelStatus) Setup(jobGroup *jobs.JobGroup, lifecycle fx.Lifecycle) { return nil }, OnStop: func(context.Context) error { - err := jobGroup.DeregisterJob("label-status") + err := jobGroup.DeregisterJob(jobName) if err != nil { return err } From 717ca31c4c3c9bdae6a135d7da57cbdba5b70655 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Mon, 29 Jan 2024 14:00:54 -0800 Subject: [PATCH 8/8] Add new fields to LabelStatusFactory and Factory structs --- pkg/labelstatus/labelstatus.go | 5 +++-- .../actuators/workload-scheduler/scheduler.go | 12 ++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/labelstatus/labelstatus.go b/pkg/labelstatus/labelstatus.go index cf9f1e39d0..3e88465647 100644 --- a/pkg/labelstatus/labelstatus.go +++ b/pkg/labelstatus/labelstatus.go @@ -43,6 +43,7 @@ func (lsf *LabelStatusFactory) New(labelKey string, policyName string, component labelKey: labelKey, policyName: policyName, componentID: componentID, + timestamp: time.Time{}, } } @@ -96,8 +97,8 @@ func (ls *LabelStatus) setLookupStatus(ctx context.Context) (proto.Message, erro } labels := map[string]string{ - "policy": ls.policyName, - "component": ls.componentID, + "policy_name": ls.policyName, + "component_id": ls.componentID, } if time.Since(ls.timestamp) >= 5*time.Minute { diff --git a/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go b/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go index b8aaf76014..57603938fe 100644 --- a/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go +++ b/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go @@ -530,10 +530,14 @@ func (wsFactory *Factory) NewScheduler( Name: metrics.DefaultWorkloadIndex, }, }, - registry: registry, - workloadMultiMatcher: mm, - component: component, - metrics: schedulerMetrics, + registry: registry, + workloadMultiMatcher: mm, + component: component, + metrics: schedulerMetrics, + tokensLabelKeyStatus: tokensLabelKeyStatus, + priorityLabelKeyStatus: priorityLabelKeyStatus, + workloadLabelKeyStatus: workloadLabelKeyStatus, + fairnessLabelKeyStatus: fairnessLabelKeyStatus, } var wfqMetrics *scheduler.WFQMetrics