Skip to content

Commit

Permalink
Add label alerts in other limiters
Browse files Browse the repository at this point in the history
- concurrency-scheduler
- load-scheduler
- quota-scheduler
- scheduler
  • Loading branch information
hasit committed Jan 26, 2024
1 parent 898a69e commit 2828179
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher"
"github.com/fluxninja/aperture/v2/pkg/jobs"
"github.com/fluxninja/aperture/v2/pkg/labels"
"github.com/fluxninja/aperture/v2/pkg/labelstatus"
"github.com/fluxninja/aperture/v2/pkg/log"
"github.com/fluxninja/aperture/v2/pkg/metrics"
"github.com/fluxninja/aperture/v2/pkg/notifiers"
Expand Down Expand Up @@ -71,13 +72,15 @@ func provideConcurrencySchedulerWatchers(
}

type concurrencySchedulerFactory struct {
engineAPI iface.Engine
registry status.Registry
decisionsWatcher notifiers.Watcher
distCache *distcache.DistCache
auditJobGroup *jobs.JobGroup
wsFactory *workloadscheduler.Factory
agentGroupName string
engineAPI iface.Engine
registry status.Registry
decisionsWatcher notifiers.Watcher
distCache *distcache.DistCache
auditJobGroup *jobs.JobGroup
wsFactory *workloadscheduler.Factory
agentGroupName string
labelStatusJobGroup *jobs.JobGroup
labelStatusFactory *labelstatus.LabelStatusFactory
}

// main fx app.
Expand All @@ -91,6 +94,7 @@ func setupConcurrencySchedulerFactory(
etcdClient *etcdclient.Client,
ai *agentinfo.AgentInfo,
wsFactory *workloadscheduler.Factory,
labelStatusFactory *labelstatus.LabelStatusFactory,
) error {
agentGroupName := ai.GetAgentGroup()
etcdPath := path.Join(paths.ConcurrencySchedulerDecisionsPath)
Expand All @@ -109,14 +113,22 @@ func setupConcurrencySchedulerFactory(
return err
}

labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", concurrencySchedulerStatusRoot), jobs.JobGroupConfig{}, nil)
if err != nil {
logger.Error().Err(err).Msg("Failed to create labels status job group")
return err
}

concurrencySchedulerFactory := &concurrencySchedulerFactory{
engineAPI: e,
distCache: distCache,
auditJobGroup: auditJobGroup,
decisionsWatcher: decisionsWatcher,
agentGroupName: agentGroupName,
registry: reg,
wsFactory: wsFactory,
engineAPI: e,
distCache: distCache,
auditJobGroup: auditJobGroup,
decisionsWatcher: decisionsWatcher,
agentGroupName: agentGroupName,
registry: reg,
wsFactory: wsFactory,
labelStatusJobGroup: labelStatusJobGroup,
labelStatusFactory: labelStatusFactory,
}

fxDriver, err := notifiers.NewFxDriver(reg, prometheusRegistry,
Expand All @@ -132,6 +144,10 @@ func setupConcurrencySchedulerFactory(
if err != nil {
return err
}
err = labelStatusJobGroup.Start()
if err != nil {
return err
}
err = decisionsWatcher.Start()
if err != nil {
return err
Expand All @@ -144,6 +160,10 @@ func setupConcurrencySchedulerFactory(
if err != nil {
merr = multierr.Append(merr, err)
}
err = labelStatusJobGroup.Stop()
if err != nil {
merr = multierr.Append(merr, err)
}
err = auditJobGroup.Stop()
if err != nil {
merr = multierr.Append(merr, err)
Expand Down Expand Up @@ -181,13 +201,19 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions(
}

cs := &concurrencyScheduler{
Component: wrapperMessage.GetCommonAttributes(),
proto: csProto,
csFactory: csFactory,
registry: reg,
clock: clockwork.NewRealClock(),
Component: wrapperMessage.GetCommonAttributes(),
proto: csProto,
csFactory: csFactory,
registry: reg,
clock: clockwork.NewRealClock(),
labelStatusJobGroup: csFactory.labelStatusJobGroup,
}
cs.name = iface.ComponentKey(cs)
cs.limitByLabelKeyStatus = csFactory.labelStatusFactory.New("limit_by_label_key", cs.GetPolicyName(), cs.GetComponentId())
cs.tokensLabelKeyStatus = csFactory.labelStatusFactory.New("tokens_label_key", cs.GetPolicyName(), cs.GetComponentId())
cs.priorityLabelKeyStatus = csFactory.labelStatusFactory.New("priority_label_key", cs.GetPolicyName(), cs.GetComponentId())
cs.workloadLabelKeyStatus = csFactory.labelStatusFactory.New("workload_label_key", cs.GetPolicyName(), cs.GetComponentId())
cs.fairnessLabelKeyStatus = csFactory.labelStatusFactory.New("fairness_label_key", cs.GetPolicyName(), cs.GetComponentId())

return fx.Options(
fx.Invoke(
Expand All @@ -200,13 +226,19 @@ func (csFactory *concurrencySchedulerFactory) newConcurrencySchedulerOptions(
type concurrencyScheduler struct {
schedulers sync.Map
iface.Component
registry status.Registry
clock clockwork.Clock
csFactory *concurrencySchedulerFactory
limiter concurrencylimiter.ConcurrencyLimiter
proto *policylangv1.ConcurrencyScheduler
schedulerMetrics *workloadscheduler.SchedulerMetrics
name string
registry status.Registry
clock clockwork.Clock
csFactory *concurrencySchedulerFactory
limiter concurrencylimiter.ConcurrencyLimiter
proto *policylangv1.ConcurrencyScheduler
schedulerMetrics *workloadscheduler.SchedulerMetrics
name string
labelStatusJobGroup *jobs.JobGroup
limitByLabelKeyStatus *labelstatus.LabelStatus
tokensLabelKeyStatus *labelstatus.LabelStatus
priorityLabelKeyStatus *labelstatus.LabelStatus
workloadLabelKeyStatus *labelstatus.LabelStatus
fairnessLabelKeyStatus *labelstatus.LabelStatus
}

func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error {
Expand All @@ -233,6 +265,12 @@ func (cs *concurrencyScheduler) setup(lifecycle fx.Lifecycle) error {
metricLabels[metrics.PolicyHashLabel] = cs.GetPolicyHash()
metricLabels[metrics.ComponentIDLabel] = cs.GetComponentId()

cs.limitByLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle)
cs.tokensLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle)
cs.priorityLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle)
cs.workloadLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle)
cs.fairnessLabelKeyStatus.Setup(cs.csFactory.labelStatusJobGroup, lifecycle)

lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
var err error
Expand Down Expand Up @@ -333,6 +371,7 @@ func (cs *concurrencyScheduler) getLabelKey(labels labels.Labels) (string, bool)
} else {
labelValue, found := labels.Get(labelKey)
if !found {
cs.limitByLabelKeyStatus.SetMissing()
return "", false
}
label = labelKey + ":" + labelValue
Expand Down Expand Up @@ -376,7 +415,6 @@ func (cs *concurrencyScheduler) Decide(ctx context.Context, labels labels.Labels

var found bool
label, found = cs.getLabelKey(labels)

if !found {
reason = flowcontrolv1.LimiterDecision_LIMITER_REASON_KEY_NOT_FOUND
return returnDecision()
Expand All @@ -393,6 +431,10 @@ func (cs *concurrencyScheduler) Decide(ctx context.Context, labels labels.Labels
cs,
tokenCounter,
cs.schedulerMetrics,
cs.tokensLabelKeyStatus,
cs.priorityLabelKeyStatus,
cs.workloadLabelKeyStatus,
cs.fairnessLabelKeyStatus,
)
if err != nil {
log.Error().Err(err).Msg("Failed to create scheduler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/fluxninja/aperture/v2/pkg/config"
etcdclient "github.com/fluxninja/aperture/v2/pkg/etcd/client"
etcdwatcher "github.com/fluxninja/aperture/v2/pkg/etcd/watcher"
"github.com/fluxninja/aperture/v2/pkg/jobs"
"github.com/fluxninja/aperture/v2/pkg/labels"
"github.com/fluxninja/aperture/v2/pkg/labelstatus"
"github.com/fluxninja/aperture/v2/pkg/metrics"
"github.com/fluxninja/aperture/v2/pkg/notifiers"
workloadscheduler "github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/actuators/workload-scheduler"
Expand Down Expand Up @@ -78,6 +80,8 @@ type loadSchedulerFactory struct {
tokenBucketAvailableTokensGaugeVec *prometheus.GaugeVec
wsFactory *workloadscheduler.Factory
agentGroupName string
labelStatusJobGroup *jobs.JobGroup
labelStatusFactory *labelstatus.LabelStatusFactory
}

// setupLoadSchedulerFactory sets up the load scheduler module in the main fx app.
Expand All @@ -90,9 +94,12 @@ func setupLoadSchedulerFactory(
etcdClient *etcdclient.Client,
ai *agentinfo.AgentInfo,
wsFactory *workloadscheduler.Factory,
labelStatusFactory *labelstatus.LabelStatusFactory,
) error {
reg := registry.Child("component", "load_scheduler")

logger := reg.GetLogger()

agentGroup := ai.GetAgentGroup()

// Scope the sync to the agent group.
Expand All @@ -102,12 +109,20 @@ func setupLoadSchedulerFactory(
return err
}

labelStatusJobGroup, err := jobs.NewJobGroup(reg.Child("label_status", "load_scheduler"), jobs.JobGroupConfig{}, nil)
if err != nil {
logger.Error().Err(err).Msg("Failed to create labels status job group")
return err
}

lsFactory := &loadSchedulerFactory{
engineAPI: e,
registry: reg,
wsFactory: wsFactory,
loadDecisionWatcher: loadDecisionWatcher,
agentGroupName: ai.GetAgentGroup(),
labelStatusJobGroup: labelStatusJobGroup,
labelStatusFactory: labelStatusFactory,
}

// Initialize and register the WFQ and Token Bucket Metric Vectors
Expand Down Expand Up @@ -169,6 +184,11 @@ func setupLoadSchedulerFactory(
return err
}

err = labelStatusJobGroup.Start()
if err != nil {
return err
}

err = lsFactory.loadDecisionWatcher.Start()
if err != nil {
return err
Expand All @@ -184,6 +204,11 @@ func setupLoadSchedulerFactory(
merr = multierr.Append(merr, err)
}

err = labelStatusJobGroup.Stop()
if err != nil {
merr = multierr.Append(merr, err)
}

if !prometheusRegistry.Unregister(lsFactory.tokenBucketLMGaugeVec) {
err := fmt.Errorf("failed to unregister " + metrics.TokenBucketLMMetricName)
merr = multierr.Append(merr, err)
Expand Down Expand Up @@ -236,7 +261,12 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions(
registry: registry,
loadSchedulerFactory: lsFactory,
clock: clockwork.NewRealClock(),
labelStatusJobGroup: lsFactory.labelStatusJobGroup,
}
ls.tokensLabelKeyStatus = lsFactory.labelStatusFactory.New("tokens_label_key", ls.GetPolicyName(), ls.GetComponentId())
ls.priorityLabelKeyStatus = lsFactory.labelStatusFactory.New("priority_label_key", ls.GetPolicyName(), ls.GetComponentId())
ls.workloadLabelKeyStatus = lsFactory.labelStatusFactory.New("workload_label_key", ls.GetPolicyName(), ls.GetComponentId())
ls.fairnessLabelKeyStatus = lsFactory.labelStatusFactory.New("fairness_label_key", ls.GetPolicyName(), ls.GetComponentId())

return fx.Options(
fx.Invoke(
Expand All @@ -249,13 +279,18 @@ func (lsFactory *loadSchedulerFactory) newLoadSchedulerOptions(
type loadScheduler struct {
// TODO: comment to self: why do we depend on Scheduler to implement Decide and Revert in this case?
iface.Component
scheduler *workloadscheduler.Scheduler
registry status.Registry
proto *policylangv1.LoadScheduler
loadSchedulerFactory *loadSchedulerFactory
clock clockwork.Clock
tokenBucket *scheduler.LoadMultiplierTokenBucket
schedulerMetrics *workloadscheduler.SchedulerMetrics
scheduler *workloadscheduler.Scheduler
registry status.Registry
proto *policylangv1.LoadScheduler
loadSchedulerFactory *loadSchedulerFactory
clock clockwork.Clock
tokenBucket *scheduler.LoadMultiplierTokenBucket
schedulerMetrics *workloadscheduler.SchedulerMetrics
labelStatusJobGroup *jobs.JobGroup
tokensLabelKeyStatus *labelstatus.LabelStatus
priorityLabelKeyStatus *labelstatus.LabelStatus
workloadLabelKeyStatus *labelstatus.LabelStatus
fairnessLabelKeyStatus *labelstatus.LabelStatus
}

// Make sure LoadScheduler implements the iface.LoadScheduler.
Expand Down Expand Up @@ -342,6 +377,10 @@ func (ls *loadScheduler) setup(lifecycle fx.Lifecycle) error {
ls,
ls.tokenBucket,
ls.schedulerMetrics,
ls.tokensLabelKeyStatus,
ls.priorityLabelKeyStatus,
ls.workloadLabelKeyStatus,
ls.fairnessLabelKeyStatus,
)
if err != nil {
return retErr(err)
Expand Down
Loading

0 comments on commit 2828179

Please sign in to comment.