From 2828179fb2f87676277249044e5700e77086504b Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Fri, 26 Jan 2024 15:53:13 -0800 Subject: [PATCH] Add label alerts in other limiters - concurrency-scheduler - load-scheduler - quota-scheduler - scheduler --- .../concurrency-scheduler.go | 96 +++++++++++++------ .../load-scheduler/load-scheduler.go | 53 ++++++++-- .../quota-scheduler/quota-scheduler.go | 61 +++++++++--- .../actuators/workload-scheduler/scheduler.go | 47 ++++++--- 4 files changed, 197 insertions(+), 60 deletions(-) diff --git a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go index dee41ed00a..0aae11e56a 100644 --- a/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/concurrency-scheduler/concurrency-scheduler.go @@ -23,6 +23,7 @@ import ( etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" @@ -71,13 +72,15 @@ func provideConcurrencySchedulerWatchers( } type concurrencySchedulerFactory struct { - engineAPI iface.Engine - registry status.Registry - decisionsWatcher notifiers.Watcher - distCache *distcache.DistCache - auditJobGroup *jobs.JobGroup - wsFactory *workloadscheduler.Factory - agentGroupName string + engineAPI iface.Engine + registry status.Registry + decisionsWatcher notifiers.Watcher + distCache *distcache.DistCache + auditJobGroup *jobs.JobGroup + wsFactory *workloadscheduler.Factory + agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // main fx app. @@ -91,6 +94,7 @@ func setupConcurrencySchedulerFactory( etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, wsFactory *workloadscheduler.Factory, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { agentGroupName := ai.GetAgentGroup() etcdPath := path.Join(paths.ConcurrencySchedulerDecisionsPath) @@ -109,14 +113,22 @@ func setupConcurrencySchedulerFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", concurrencySchedulerStatusRoot), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + concurrencySchedulerFactory := &concurrencySchedulerFactory{ - engineAPI: e, - distCache: distCache, - auditJobGroup: auditJobGroup, - decisionsWatcher: decisionsWatcher, - agentGroupName: agentGroupName, - registry: reg, - wsFactory: wsFactory, + engineAPI: e, + distCache: distCache, + auditJobGroup: auditJobGroup, + decisionsWatcher: decisionsWatcher, + agentGroupName: agentGroupName, + registry: reg, + wsFactory: wsFactory, + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } fxDriver, err := notifiers.NewFxDriver(reg, prometheusRegistry, @@ -132,6 +144,10 @@ func setupConcurrencySchedulerFactory( if err != nil { return err } + err = labelStatusJobGroup.Start() + if err != nil { + return err + } err = decisionsWatcher.Start() if err != nil { return err @@ -144,6 +160,10 @@ func setupConcurrencySchedulerFactory( if err != nil { merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } err = auditJobGroup.Stop() if err != nil { merr = multierr.Append(merr, err) @@ -181,13 +201,19 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions( } cs := &concurrencyScheduler{ - Component: wrapperMessage.GetCommonAttributes(), - proto: csProto, - csFactory: csFactory, - registry: reg, - clock: clockwork.NewRealClock(), + Component: wrapperMessage.GetCommonAttributes(), + proto: csProto, + csFactory: csFactory, + registry: reg, + clock: clockwork.NewRealClock(), + labelStatusJobGroup: csFactory.labelStatusJobGroup, } cs.name = iface.ComponentKey(cs) + cs.limitByLabelKeyStatus = csFactory.labelStatusFactory.New("limit_by_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.tokensLabelKeyStatus = csFactory.labelStatusFactory.New("tokens_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.priorityLabelKeyStatus = csFactory.labelStatusFactory.New("priority_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.workloadLabelKeyStatus = csFactory.labelStatusFactory.New("workload_label_key", cs.GetPolicyName(), cs.GetComponentId()) + cs.fairnessLabelKeyStatus = csFactory.labelStatusFactory.New("fairness_label_key", cs.GetPolicyName(), cs.GetComponentId()) return fx.Options( fx.Invoke( @@ -200,13 +226,19 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions( type concurrencyScheduler struct { schedulers sync.Map iface.Component - registry status.Registry - clock clockwork.Clock - csFactory *concurrencySchedulerFactory - limiter concurrencylimiter.ConcurrencyLimiter - proto *policylangv1.ConcurrencyScheduler - schedulerMetrics *workloadscheduler.SchedulerMetrics - name string + registry status.Registry + clock clockwork.Clock + csFactory *concurrencySchedulerFactory + limiter concurrencylimiter.ConcurrencyLimiter + proto *policylangv1.ConcurrencyScheduler + schedulerMetrics *workloadscheduler.SchedulerMetrics + name string + labelStatusJobGroup *jobs.JobGroup + limitByLabelKeyStatus *labelstatus.LabelStatus + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error { @@ -233,6 +265,12 @@ func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error { metricLabels[metrics.PolicyHashLabel] = cs.GetPolicyHash() metricLabels[metrics.ComponentIDLabel] = cs.GetComponentId() + cs.limitByLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.tokensLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.priorityLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.workloadLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + cs.fairnessLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle) + lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { var err error @@ -333,6 +371,7 @@ func (cs *concurrencyScheduler) getLabelKey(labels labels.Labels) (string, bool) } else { labelValue, found := labels.Get(labelKey) if !found { + cs.limitByLabelKeyStatus.SetMissing() return "", false } label = labelKey + ":" + labelValue @@ -376,7 +415,6 @@ func (cs *concurrencyScheduler) Decide(ctx context.Context, labels labels.Labels var found bool label, found = cs.getLabelKey(labels) - if !found { reason = flowcontrolv1.LimiterDecision_LIMITER_REASON_KEY_NOT_FOUND return returnDecision() @@ -393,6 +431,10 @@ func (cs *concurrencyScheduler) Decide(ctx context.Context, labels labels.Labels cs, tokenCounter, cs.schedulerMetrics, + cs.tokensLabelKeyStatus, + cs.priorityLabelKeyStatus, + cs.workloadLabelKeyStatus, + cs.fairnessLabelKeyStatus, ) if err != nil { log.Error().Err(err).Msg("Failed to create scheduler") diff --git a/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go b/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go index 50ee28562a..f67a589654 100644 --- a/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/load-scheduler/load-scheduler.go @@ -19,7 +19,9 @@ import ( "github.com/fluxninja/aperture/v2/pkg/config" etcdclient "github.com/fluxninja/aperture/v2/pkg/etcd/client" etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" + "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" workloadscheduler "github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/actuators/workload-scheduler" @@ -78,6 +80,8 @@ type loadSchedulerFactory struct { tokenBucketAvailableTokensGaugeVec *prometheus.GaugeVec wsFactory *workloadscheduler.Factory agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // setupLoadSchedulerFactory sets up the load scheduler module in the main fx app. @@ -90,9 +94,12 @@ func setupLoadSchedulerFactory( etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, wsFactory *workloadscheduler.Factory, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { reg := registry.Child("component", "load_scheduler") + logger := reg.GetLogger() + agentGroup := ai.GetAgentGroup() // Scope the sync to the agent group. @@ -102,12 +109,20 @@ func setupLoadSchedulerFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", "load_scheduler"), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + lsFactory := &loadSchedulerFactory{ engineAPI: e, registry: reg, wsFactory: wsFactory, loadDecisionWatcher: loadDecisionWatcher, agentGroupName: ai.GetAgentGroup(), + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } // Initialize and register the WFQ and Token Bucket Metric Vectors @@ -169,6 +184,11 @@ func setupLoadSchedulerFactory( return err } + err = labelStatusJobGroup.Start() + if err != nil { + return err + } + err = lsFactory.loadDecisionWatcher.Start() if err != nil { return err @@ -184,6 +204,11 @@ func setupLoadSchedulerFactory( merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } + if !prometheusRegistry.Unregister(lsFactory.tokenBucketLMGaugeVec) { err := fmt.Errorf("failed to unregister " + metrics.TokenBucketLMMetricName) merr = multierr.Append(merr, err) @@ -236,7 +261,12 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions( registry: registry, loadSchedulerFactory: lsFactory, clock: clockwork.NewRealClock(), + labelStatusJobGroup: lsFactory.labelStatusJobGroup, } + ls.tokensLabelKeyStatus = lsFactory.labelStatusFactory.New("tokens_label_key", ls.GetPolicyName(), ls.GetComponentId()) + ls.priorityLabelKeyStatus = lsFactory.labelStatusFactory.New("priority_label_key", ls.GetPolicyName(), ls.GetComponentId()) + ls.workloadLabelKeyStatus = lsFactory.labelStatusFactory.New("workload_label_key", ls.GetPolicyName(), ls.GetComponentId()) + ls.fairnessLabelKeyStatus = lsFactory.labelStatusFactory.New("fairness_label_key", ls.GetPolicyName(), ls.GetComponentId()) return fx.Options( fx.Invoke( @@ -249,13 +279,18 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions( type loadScheduler struct { // TODO: comment to self: why do we depend on Scheduler to implement Decide and Revert in this case? iface.Component - scheduler *workloadscheduler.Scheduler - registry status.Registry - proto *policylangv1.LoadScheduler - loadSchedulerFactory *loadSchedulerFactory - clock clockwork.Clock - tokenBucket *scheduler.LoadMultiplierTokenBucket - schedulerMetrics *workloadscheduler.SchedulerMetrics + scheduler *workloadscheduler.Scheduler + registry status.Registry + proto *policylangv1.LoadScheduler + loadSchedulerFactory *loadSchedulerFactory + clock clockwork.Clock + tokenBucket *scheduler.LoadMultiplierTokenBucket + schedulerMetrics *workloadscheduler.SchedulerMetrics + labelStatusJobGroup *jobs.JobGroup + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } // Make sure LoadScheduler implements the iface.LoadScheduler. @@ -342,6 +377,10 @@ func (ls *loadScheduler) setup(lifecycle fx.Lifecycle) error { ls, ls.tokenBucket, ls.schedulerMetrics, + ls.tokensLabelKeyStatus, + ls.priorityLabelKeyStatus, + ls.workloadLabelKeyStatus, + ls.fairnessLabelKeyStatus, ) if err != nil { return retErr(err) diff --git a/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go b/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go index 368bc00aa5..ac5afc093b 100644 --- a/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go +++ b/pkg/policies/flowcontrol/actuators/quota-scheduler/quota-scheduler.go @@ -25,6 +25,7 @@ import ( etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher" "github.com/fluxninja/aperture/v2/pkg/jobs" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" "github.com/fluxninja/aperture/v2/pkg/notifiers" @@ -81,6 +82,8 @@ type quotaSchedulerFactory struct { rateLimiterJobGroup *jobs.JobGroup wsFactory *workloadscheduler.Factory agentGroupName string + labelStatusJobGroup *jobs.JobGroup + labelStatusFactory *labelstatus.LabelStatusFactory } // main fx app. @@ -94,6 +97,7 @@ func setupQuotaSchedulerFactory( etcdClient *etcdclient.Client, ai *agentinfo.AgentInfo, wsFactory *workloadscheduler.Factory, + labelStatusFactory *labelstatus.LabelStatusFactory, ) error { agentGroupName := ai.GetAgentGroup() etcdPath := path.Join(paths.QuotaSchedulerDecisionsPath) @@ -118,6 +122,12 @@ func setupQuotaSchedulerFactory( return err } + labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", "load_scheduler"), jobs.JobGroupConfig{}, nil) + if err != nil { + logger.Error().Err(err).Msg("Failed to create labels status job group") + return err + } + quotaSchedulerFactory := "aSchedulerFactory{ engineAPI: e, distCache: distCache, @@ -127,6 +137,8 @@ func setupQuotaSchedulerFactory( agentGroupName: agentGroupName, registry: reg, wsFactory: wsFactory, + labelStatusJobGroup: labelStatusJobGroup, + labelStatusFactory: labelStatusFactory, } fxDriver, err := notifiers.NewFxDriver(reg, prometheusRegistry, @@ -146,6 +158,11 @@ func setupQuotaSchedulerFactory( if err != nil { return err } + + err = labelStatusJobGroup.Start() + if err != nil { + return err + } err = decisionsWatcher.Start() if err != nil { return err @@ -158,6 +175,10 @@ func setupQuotaSchedulerFactory( if err != nil { merr = multierr.Append(merr, err) } + err = labelStatusJobGroup.Stop() + if err != nil { + merr = multierr.Append(merr, err) + } err = rateLimiterJobGroup.Stop() if err != nil { merr = multierr.Append(merr, err) @@ -199,13 +220,18 @@ func (qsFactory *quotaSchedulerFactory) newQuotaSchedulerOptions( } qs := "aScheduler{ - Component: wrapperMessage.GetCommonAttributes(), - proto: qsProto, - qsFactory: qsFactory, - registry: reg, - clock: clockwork.NewRealClock(), + Component: wrapperMessage.GetCommonAttributes(), + proto: qsProto, + qsFactory: qsFactory, + registry: reg, + clock: clockwork.NewRealClock(), + labelStatusJobGroup: qsFactory.labelStatusJobGroup, } qs.name = iface.ComponentKey(qs) + qs.tokensLabelKeyStatus = qsFactory.labelStatusFactory.New("tokens_label_key", qs.GetPolicyName(), qs.GetComponentId()) + qs.priorityLabelKeyStatus = qsFactory.labelStatusFactory.New("priority_label_key", qs.GetPolicyName(), qs.GetComponentId()) + qs.workloadLabelKeyStatus = qsFactory.labelStatusFactory.New("workload_label_key", qs.GetPolicyName(), qs.GetComponentId()) + qs.fairnessLabelKeyStatus = qsFactory.labelStatusFactory.New("fairness_label_key", qs.GetPolicyName(), qs.GetComponentId()) return fx.Options( fx.Invoke( @@ -218,14 +244,19 @@ func (qsFactory *quotaSchedulerFactory) newQuotaSchedulerOptions( type quotaScheduler struct { schedulers sync.Map iface.Component - registry status.Registry - limiter ratelimiter.RateLimiter - clock clockwork.Clock - qsFactory *quotaSchedulerFactory - inner *globaltokenbucket.GlobalTokenBucket - proto *policylangv1.QuotaScheduler - schedulerMetrics *workloadscheduler.SchedulerMetrics - name string + registry status.Registry + limiter ratelimiter.RateLimiter + clock clockwork.Clock + qsFactory *quotaSchedulerFactory + inner *globaltokenbucket.GlobalTokenBucket + proto *policylangv1.QuotaScheduler + schedulerMetrics *workloadscheduler.SchedulerMetrics + name string + labelStatusJobGroup *jobs.JobGroup + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } func (qs *quotaScheduler) setup(lifecycle fx.Lifecycle) error { @@ -433,6 +464,10 @@ func (qs *quotaScheduler) Decide(ctx context.Context, labels labels.Labels) *flo qs, tokenBucket, qs.schedulerMetrics, + qs.tokensLabelKeyStatus, + qs.priorityLabelKeyStatus, + qs.workloadLabelKeyStatus, + qs.fairnessLabelKeyStatus, ) if err != nil { log.Error().Err(err).Msg("Failed to create scheduler") diff --git a/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go b/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go index bb87936be0..b8aaf76014 100644 --- a/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go +++ b/pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go @@ -18,6 +18,7 @@ import ( policylangv1 "github.com/fluxninja/aperture/api/v2/gen/proto/go/aperture/policy/language/v1" "github.com/fluxninja/aperture/v2/pkg/config" "github.com/fluxninja/aperture/v2/pkg/labels" + "github.com/fluxninja/aperture/v2/pkg/labelstatus" "github.com/fluxninja/aperture/v2/pkg/log" "github.com/fluxninja/aperture/v2/pkg/metrics" multimatcher "github.com/fluxninja/aperture/v2/pkg/multi-matcher" @@ -431,15 +432,19 @@ func (sm *SchedulerMetrics) appendWorkloadLabel(workloadLabel string) prometheus // Scheduler implements load scheduler on the flowcontrol side. type Scheduler struct { - component iface.Component - scheduler scheduler.Scheduler - registry status.Registry - proto *policylangv1.Scheduler - defaultWorkload *workload - workloadMultiMatcher *multiMatcher - tokensByWorkloadIndex map[string]float64 - metrics *SchedulerMetrics - mutex sync.RWMutex + component iface.Component + scheduler scheduler.Scheduler + registry status.Registry + proto *policylangv1.Scheduler + defaultWorkload *workload + workloadMultiMatcher *multiMatcher + tokensByWorkloadIndex map[string]float64 + metrics *SchedulerMetrics + mutex sync.RWMutex + tokensLabelKeyStatus *labelstatus.LabelStatus + priorityLabelKeyStatus *labelstatus.LabelStatus + workloadLabelKeyStatus *labelstatus.LabelStatus + fairnessLabelKeyStatus *labelstatus.LabelStatus } // NewScheduler returns fx options for the load scheduler fx app. @@ -450,6 +455,10 @@ func (wsFactory *Factory) NewScheduler( component iface.Component, tokenManger scheduler.TokenManager, schedulerMetrics *SchedulerMetrics, + tokensLabelKeyStatus *labelstatus.LabelStatus, + priorityLabelKeyStatus *labelstatus.LabelStatus, + workloadLabelKeyStatus *labelstatus.LabelStatus, + fairnessLabelKeyStatus *labelstatus.LabelStatus, ) (*Scheduler, error) { initPreemptMetrics := func(workloadLabel string) error { if schedulerMetrics == nil { @@ -579,8 +588,11 @@ func (s *Scheduler) Decide(ctx context.Context, labels labels.Labels) (*flowcont } if s.proto.WorkloadLabelKey != "" { - if val, ok := labels.Get(s.proto.WorkloadLabelKey); ok { + val, ok := labels.Get(s.proto.WorkloadLabelKey) + if ok { matchedWorkloadLabel = val + } else { + s.workloadLabelKeyStatus.SetMissing() } } @@ -589,21 +601,27 @@ func (s *Scheduler) Decide(ctx context.Context, labels labels.Labels) (*flowcont } if s.proto.TokensLabelKey != "" { - if val, ok := labels.Get(s.proto.TokensLabelKey); ok { + val, ok := labels.Get(s.proto.TokensLabelKey) + if ok { if parsedTokens, err := strconv.ParseFloat(val, 64); err == nil { tokens = parsedTokens + } else { + s.tokensLabelKeyStatus.SetMissing() } } } if s.proto.PriorityLabelKey != "" { - if val, ok := labels.Get(s.proto.PriorityLabelKey); ok { + val, ok := labels.Get(s.proto.PriorityLabelKey) + if ok { if parsedPriority, err := strconv.ParseFloat(val, 64); err == nil { if parsedPriority > 0 { priority = parsedPriority invPriority = 1 / parsedPriority } } + } else { + s.priorityLabelKeyStatus.SetMissing() } } @@ -648,8 +666,11 @@ func (s *Scheduler) Decide(ctx context.Context, labels labels.Labels) (*flowcont var fairnessLabel string if s.proto.FairnessLabelKey != "" { - if val, ok := labels.Get(s.proto.FairnessLabelKey); ok { + val, ok := labels.Get(s.proto.FairnessLabelKey) + if ok { fairnessLabel = val + } else { + s.fairnessLabelKeyStatus.SetMissing() } }