From e79d1c5c527491b05d7fc421e8b903208b8a5f1c Mon Sep 17 00:00:00 2001 From: Alan Clucas Date: Tue, 4 Feb 2025 21:10:12 +0000 Subject: [PATCH] fix: locking in metrics (#14144) Signed-off-by: Alan Clucas --- .../cronworkflow-deprecated-schedule.yaml | 3 +- util/telemetry/instrument.go | 8 +- util/telemetry/metrics.go | 40 ++++++-- util/telemetry/operators.go | 4 +- workflow/metrics/counter_log.go | 2 +- workflow/metrics/gauge_pod_phase.go | 4 +- workflow/metrics/gauge_workflow_condition.go | 4 +- workflow/metrics/gauge_workflow_phase.go | 4 +- workflow/metrics/leader.go | 4 +- workflow/metrics/metrics.go | 2 + workflow/metrics/metrics_custom.go | 97 ++++++++++++------- workflow/metrics/metrics_test.go | 25 ++--- workflow/metrics/work_queue.go | 29 +++--- 13 files changed, 137 insertions(+), 89 deletions(-) diff --git a/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml b/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml index 32951f84afb5..a99f2bb79cee 100644 --- a/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml +++ b/test/e2e/testdata/cronworkflow-deprecated-schedule.yaml @@ -3,8 +3,7 @@ kind: CronWorkflow metadata: name: test-cron-deprecated-schedule spec: - schedules: - - "* * * * *" + schedule: "* * * * *" concurrencyPolicy: "Forbid" startingDeadlineSeconds: 0 workflowMetadata: diff --git a/util/telemetry/instrument.go b/util/telemetry/instrument.go index 6831dd5be804..ed12b748873f 100644 --- a/util/telemetry/instrument.go +++ b/util/telemetry/instrument.go @@ -17,7 +17,7 @@ type Instrument struct { } func (m *Metrics) preCreateCheck(name string) error { - if _, exists := m.AllInstruments[name]; exists { + if inst := m.GetInstrument(name); inst != nil { return fmt.Errorf("Instrument called %s already exists", name) } return nil @@ -69,8 +69,6 @@ func collectOptions(options ...instrumentOption) instrumentOptions { func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error { opts := collectOptions(options...) - m.Mutex.Lock() - defer m.Mutex.Unlock() err := m.preCreateCheck(name) if err != nil { return err @@ -137,11 +135,11 @@ func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit str if err != nil { return err } - m.AllInstruments[name] = &Instrument{ + m.AddInstrument(name, &Instrument{ name: name, description: desc, otel: instPtr, - } + }) return nil } diff --git a/util/telemetry/metrics.go b/util/telemetry/metrics.go index a4583fe92aed..e6094588c867 100644 --- a/util/telemetry/metrics.go +++ b/util/telemetry/metrics.go @@ -29,15 +29,39 @@ type Config struct { } type Metrics struct { - // Ensures mutual exclusion in workflows map - Mutex sync.RWMutex - // Evil context for compatibility with legacy context free interfaces Ctx context.Context otelMeter *metric.Meter config *Config - AllInstruments map[string]*Instrument + // Ensures mutual exclusion in instruments + mutex sync.RWMutex + instruments map[string]*Instrument +} + +func (m *Metrics) AddInstrument(name string, inst *Instrument) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.instruments[name] = inst +} + +func (m *Metrics) GetInstrument(name string) *Instrument { + m.mutex.RLock() + defer m.mutex.RUnlock() + inst, ok := m.instruments[name] + if !ok { + return nil + } + return inst +} + +// IterateROInstruments iterates over every instrument for Read-Only purposes +func (m *Metrics) IterateROInstruments(fn func(i *Instrument)) { + m.mutex.RLock() + defer m.mutex.RUnlock() + for _, i := range m.instruments { + fn(i) + } } func NewMetrics(ctx context.Context, serviceName, prometheusName string, config *Config, extraOpts ...metricsdk.Option) (*Metrics, error) { @@ -81,10 +105,10 @@ func NewMetrics(ctx context.Context, serviceName, prometheusName string, config meter := provider.Meter(serviceName) metrics := &Metrics{ - Ctx: ctx, - otelMeter: &meter, - config: config, - AllInstruments: make(map[string]*Instrument), + Ctx: ctx, + otelMeter: &meter, + config: config, + instruments: make(map[string]*Instrument), } return metrics, nil diff --git a/util/telemetry/operators.go b/util/telemetry/operators.go index f99f7426d5d6..3e87b33f69cf 100644 --- a/util/telemetry/operators.go +++ b/util/telemetry/operators.go @@ -10,7 +10,7 @@ import ( ) func (m *Metrics) AddInt(ctx context.Context, name string, val int64, attribs InstAttribs) { - if instrument, ok := m.AllInstruments[name]; ok { + if instrument := m.GetInstrument(name); instrument != nil { instrument.AddInt(ctx, val, attribs) } else { log.Errorf("Metrics addInt() to non-existent metric %s", name) @@ -29,7 +29,7 @@ func (i *Instrument) AddInt(ctx context.Context, val int64, attribs InstAttribs) } func (m *Metrics) Record(ctx context.Context, name string, val float64, attribs InstAttribs) { - if instrument, ok := m.AllInstruments[name]; ok { + if instrument := m.GetInstrument(name); instrument != nil { instrument.Record(ctx, val, attribs) } else { log.Errorf("Metrics record() to non-existent metric %s", name) diff --git a/workflow/metrics/counter_log.go b/workflow/metrics/counter_log.go index dd3a37061dba..3fedd827077e 100644 --- a/workflow/metrics/counter_log.go +++ b/workflow/metrics/counter_log.go @@ -16,7 +16,7 @@ func addLogCounter(ctx context.Context, m *Metrics) error { err := m.CreateBuiltinInstrument(telemetry.InstrumentLogMessages) name := telemetry.InstrumentLogMessages.Name() lm := logMetric{ - counter: m.AllInstruments[name], + counter: m.GetInstrument(name), } log.AddHook(lm) for _, level := range lm.Levels() { diff --git a/workflow/metrics/gauge_pod_phase.go b/workflow/metrics/gauge_pod_phase.go index ac401ca20ccf..83d7087bb009 100644 --- a/workflow/metrics/gauge_pod_phase.go +++ b/workflow/metrics/gauge_pod_phase.go @@ -26,9 +26,9 @@ func addPodPhaseGauge(ctx context.Context, m *Metrics) error { if m.callbacks.PodPhase != nil { ppGauge := podPhaseGauge{ callback: m.callbacks.PodPhase, - gauge: m.AllInstruments[name], + gauge: m.GetInstrument(name), } - return m.AllInstruments[name].RegisterCallback(m.Metrics, ppGauge.update) + return ppGauge.gauge.RegisterCallback(m.Metrics, ppGauge.update) } return nil } diff --git a/workflow/metrics/gauge_workflow_condition.go b/workflow/metrics/gauge_workflow_condition.go index 0c174f6456a0..0049bc1ca580 100644 --- a/workflow/metrics/gauge_workflow_condition.go +++ b/workflow/metrics/gauge_workflow_condition.go @@ -26,9 +26,9 @@ func addWorkflowConditionGauge(_ context.Context, m *Metrics) error { if m.callbacks.WorkflowCondition != nil { wfcGauge := workflowConditionGauge{ callback: m.callbacks.WorkflowCondition, - gauge: m.AllInstruments[telemetry.InstrumentWorkflowCondition.Name()], + gauge: m.GetInstrument(telemetry.InstrumentWorkflowCondition.Name()), } - return m.AllInstruments[telemetry.InstrumentWorkflowCondition.Name()].RegisterCallback(m.Metrics, wfcGauge.update) + return wfcGauge.gauge.RegisterCallback(m.Metrics, wfcGauge.update) } return nil // TODO init all phases? diff --git a/workflow/metrics/gauge_workflow_phase.go b/workflow/metrics/gauge_workflow_phase.go index 997d7088f084..4d9768c7b47d 100644 --- a/workflow/metrics/gauge_workflow_phase.go +++ b/workflow/metrics/gauge_workflow_phase.go @@ -26,9 +26,9 @@ func addWorkflowPhaseGauge(_ context.Context, m *Metrics) error { if m.callbacks.WorkflowPhase != nil { wfpGauge := workflowPhaseGauge{ callback: m.callbacks.WorkflowPhase, - gauge: m.AllInstruments[name], + gauge: m.GetInstrument(name), } - return m.AllInstruments[name].RegisterCallback(m.Metrics, wfpGauge.update) + return wfpGauge.gauge.RegisterCallback(m.Metrics, wfpGauge.update) } return nil // TODO init all phases? diff --git a/workflow/metrics/leader.go b/workflow/metrics/leader.go index e91dc6c9c8a9..6d202fa0b8de 100644 --- a/workflow/metrics/leader.go +++ b/workflow/metrics/leader.go @@ -26,9 +26,9 @@ func addIsLeader(ctx context.Context, m *Metrics) error { name := telemetry.InstrumentIsLeader.Name() lGauge := leaderGauge{ callback: m.callbacks.IsLeader, - gauge: m.AllInstruments[name], + gauge: m.GetInstrument(name), } - return m.AllInstruments[name].RegisterCallback(m.Metrics, lGauge.update) + return lGauge.gauge.RegisterCallback(m.Metrics, lGauge.update) } func (l *leaderGauge) update(_ context.Context, o metric.Observer) error { diff --git a/workflow/metrics/metrics.go b/workflow/metrics/metrics.go index 19243036be6e..73e5d22f57c6 100644 --- a/workflow/metrics/metrics.go +++ b/workflow/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "context" + "sync" "github.com/argoproj/argo-workflows/v3/util/telemetry" @@ -12,6 +13,7 @@ type Metrics struct { *telemetry.Metrics callbacks Callbacks + realtimeMutex sync.Mutex realtimeWorkflows map[string][]realtimeTracker } diff --git a/workflow/metrics/metrics_custom.go b/workflow/metrics/metrics_custom.go index 77897a5d417c..65e5df1567ed 100644 --- a/workflow/metrics/metrics_custom.go +++ b/workflow/metrics/metrics_custom.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strconv" + "sync" "time" "go.opentelemetry.io/otel/metric" @@ -28,6 +29,27 @@ type customMetricValue struct { key string } +type customMetricUserData struct { + mutex sync.RWMutex + values map[string]*customMetricValue +} + +func newUserData() *customMetricUserData { + return &customMetricUserData{ + values: make(map[string]*customMetricValue), + } +} + +func (ud *customMetricUserData) GetValue(key string) *customMetricValue { + ud.mutex.RLock() + defer ud.mutex.RUnlock() + val, ok := ud.values[key] + if !ok { + return nil + } + return val +} + type realtimeTracker struct { inst *telemetry.Instrument key string @@ -41,27 +63,30 @@ func (cmv *customMetricValue) getLabels() telemetry.InstAttribs { return labels } -func customUserdata(i *telemetry.Instrument, requireSuccess bool) map[string]*customMetricValue { +func customUserData(i *telemetry.Instrument, requireSuccess bool) *customMetricUserData { switch val := i.GetUserdata().(type) { - case map[string]*customMetricValue: + case *customMetricUserData: return val default: if requireSuccess { panic(fmt.Errorf("internal error: unexpected userdata on custom metric %s", i.GetName())) } - return make(map[string]*customMetricValue) + return nil } } func getOrCreateValue(i *telemetry.Instrument, key string, labels []*wfv1.MetricLabel) *customMetricValue { - if value, ok := customUserdata(i, true)[key]; ok { + ud := customUserData(i, true) + ud.mutex.Lock() + defer ud.mutex.Unlock() + if value, ok := ud.values[key]; ok { return value } newValue := customMetricValue{ key: key, labels: labels, } - customUserdata(i, true)[key] = &newValue + ud.values[key] = &newValue return &newValue } @@ -74,7 +99,10 @@ type customInstrument struct { // For non-realtime we have to fake observability as prometheus provides // up/down and set on the same gauge type, which otel forbids. func (i *customInstrument) customCallback(_ context.Context, o metric.Observer) error { - for _, value := range customUserdata(i.Instrument, true) { + ud := customUserData(i.Instrument, true) + ud.mutex.RLock() + defer ud.mutex.RUnlock() + for _, value := range ud.values { if value.rtValueFunc != nil { i.ObserveFloat(o, value.rtValueFunc(), value.getLabels()) } else { @@ -84,35 +112,23 @@ func (i *customInstrument) customCallback(_ context.Context, o metric.Observer) return nil } -// func addCustomMetrics(_ context.Context, m *Metrics) error { -// m.customMetrics = make(map[string]*customMetric, 0) -// return nil -// } - // GetCustomMetric returns a custom (or any) metric from it's key // This is exported for legacy testing only func (m *Metrics) GetCustomMetric(key string) *telemetry.Instrument { - m.Mutex.RLock() - defer m.Mutex.RUnlock() - // It's okay to return nil metrics in this function - return m.AllInstruments[key] + return m.GetInstrument(key) } // CustomMetricExists returns if metric exists from its key // This is exported for testing only func (m *Metrics) CustomMetricExists(key string) bool { - m.Mutex.RLock() - defer m.Mutex.RUnlock() - - // It's okay to return nil metrics in this function - return m.AllInstruments[key] != nil + return m.GetCustomMetric(key) != nil } // TODO labels on custom metrics func (m *Metrics) matchExistingMetric(metricSpec *wfv1.Prometheus) (*telemetry.Instrument, error) { key := metricSpec.Name - if inst, ok := m.AllInstruments[key]; ok { + if inst := m.GetInstrument(key); inst != nil { if inst.GetDescription() != metricSpec.Help { return nil, fmt.Errorf("Help for metric %s is already set to %s, it cannot be changed", metricSpec.Name, inst.GetDescription()) } @@ -152,11 +168,11 @@ func (m *Metrics) ensureBaseMetric(metricSpec *wfv1.Prometheus, ownerKey string) return nil, err } m.attachCustomMetricToWorkflow(metricSpec, ownerKey) - inst := m.AllInstruments[metricSpec.Name] + inst := m.GetInstrument(metricSpec.Name) if inst == nil { return nil, fmt.Errorf("Failed to create new metric %s", metricSpec.Name) } - inst.SetUserdata(make(map[string]*customMetricValue)) + inst.SetUserdata(newUserData()) return inst, nil } @@ -211,6 +227,8 @@ func (m *Metrics) UpsertCustomMetric(ctx context.Context, metricSpec *wfv1.Prome func (m *Metrics) attachCustomMetricToWorkflow(metricSpec *wfv1.Prometheus, ownerKey string) { if metricSpec.IsRealtime() { + m.realtimeMutex.Lock() + defer m.realtimeMutex.Unlock() // Must move to run each workflowkey for key := range m.realtimeWorkflows { if key == ownerKey { @@ -218,7 +236,7 @@ func (m *Metrics) attachCustomMetricToWorkflow(metricSpec *wfv1.Prometheus, owne } } m.realtimeWorkflows[ownerKey] = append(m.realtimeWorkflows[ownerKey], realtimeTracker{ - inst: m.AllInstruments[metricSpec.Name], + inst: m.GetInstrument(metricSpec.Name), key: metricSpec.GetKey(), }) } @@ -242,7 +260,7 @@ func (m *Metrics) createCustomMetric(metricSpec *wfv1.Prometheus) error { if err != nil { return err } - inst := m.AllInstruments[metricSpec.Name] + inst := m.GetInstrument(metricSpec.Name) customInst := customInstrument{Instrument: inst} return inst.RegisterCallback(m.Metrics, customInst.customCallback) default: @@ -255,22 +273,25 @@ func (m *Metrics) createCustomGauge(metricSpec *wfv1.Prometheus) error { if err != nil { return err } - inst := m.AllInstruments[metricSpec.Name] + inst := m.GetInstrument(metricSpec.Name) customInst := customInstrument{Instrument: inst} return inst.RegisterCallback(m.Metrics, customInst.customCallback) } func (m *Metrics) runCustomGC(ttl time.Duration) { - m.Mutex.Lock() - defer m.Mutex.Unlock() - for _, baseMetric := range m.AllInstruments { - custom := customUserdata(baseMetric, false) - for key, value := range custom { + m.IterateROInstruments(func(baseMetric *telemetry.Instrument) { + ud := customUserData(baseMetric, false) + if ud == nil { + return + } + ud.mutex.Lock() + for key, value := range ud.values { if time.Since(value.lastUpdated) > ttl { - delete(custom, key) + delete(ud.values, key) } } - } + ud.mutex.Unlock() + }) } func (m *Metrics) customMetricsGC(ctx context.Context, ttl time.Duration) { @@ -291,16 +312,18 @@ func (m *Metrics) customMetricsGC(ctx context.Context, ttl time.Duration) { } func (m *Metrics) StopRealtimeMetricsForWfUID(key string) { - m.Mutex.Lock() - defer m.Mutex.Unlock() - + m.realtimeMutex.Lock() + defer m.realtimeMutex.Unlock() if _, exists := m.realtimeWorkflows[key]; !exists { return } realtimeMetrics := m.realtimeWorkflows[key] for _, metric := range realtimeMetrics { - delete(customUserdata(metric.inst, true), metric.key) + ud := customUserData(metric.inst, true) + ud.mutex.Lock() + delete(ud.values, metric.key) + ud.mutex.Unlock() } delete(m.realtimeWorkflows, key) diff --git a/workflow/metrics/metrics_test.go b/workflow/metrics/metrics_test.go index 31bbeb0ad99a..a2287ee4eb16 100644 --- a/workflow/metrics/metrics_test.go +++ b/workflow/metrics/metrics_test.go @@ -74,22 +74,22 @@ func TestMetricGC(t *testing.T) { baseCm := m.GetCustomMetric(key) assert.NotNil(t, baseCm) - cm := customUserdata(baseCm, true) - assert.Len(t, cm, 1) + cm := customUserData(baseCm, true) + assert.NotNil(t, cm) + assert.Len(t, cm.values, 1) // Ensure we get at least one TTL run timeoutTime := time.Now().Add(time.Second * 2) for time.Now().Before(timeoutTime) { // Break if we know our test will pass. - if len(cm) == 0 { + if len(cm.values) == 0 { break } // Sleep to prevent overloading test worker CPU. time.Sleep(100 * time.Millisecond) } - assert.Empty(t, cm) - + assert.Empty(t, cm.values) } func TestRealtimeMetricGC(t *testing.T) { @@ -151,12 +151,12 @@ func TestWorkflowQueueMetrics(t *testing.T) { wfQueue := m.RateLimiterWithBusyWorkers(m.Ctx, workqueue.DefaultTypedControllerRateLimiter[string](), "workflow_queue") defer wfQueue.ShutDown() - assert.NotNil(t, m.AllInstruments[telemetry.InstrumentQueueDepthGauge.Name()]) - assert.NotNil(t, m.AllInstruments[telemetry.InstrumentQueueLatency.Name()]) + assert.NotNil(t, m.GetInstrument(telemetry.InstrumentQueueDepthGauge.Name())) + assert.NotNil(t, m.GetInstrument(telemetry.InstrumentQueueLatency.Name())) wfQueue.Add("hello") - require.NotNil(t, m.AllInstruments[telemetry.InstrumentQueueAddsCount.Name()]) + require.NotNil(t, m.GetInstrument(telemetry.InstrumentQueueAddsCount.Name())) val, err := te.GetInt64CounterValue(telemetry.InstrumentQueueAddsCount.Name(), &attribs) require.NoError(t, err) assert.Equal(t, int64(1), val) @@ -201,13 +201,14 @@ func TestRealTimeMetricDeletion(t *testing.T) { m.StopRealtimeMetricsForWfUID("456") assert.Empty(t, m.realtimeWorkflows["456"]) - cm := customUserdata(baseCm, true) - assert.Len(t, cm, 1) + cm := customUserData(baseCm, true) + assert.NotNil(t, cm) + assert.Len(t, cm.values, 1) assert.Len(t, m.realtimeWorkflows["123"], 1) m.StopRealtimeMetricsForWfUID("123") assert.Empty(t, m.realtimeWorkflows["123"]) - assert.Empty(t, cm) + assert.Empty(t, cm.values) err = m.UpsertCustomMetric(ctx, &wfv1.Prometheus{ Name: key, @@ -221,6 +222,6 @@ func TestRealTimeMetricDeletion(t *testing.T) { }, "456", nil) require.NoError(t, err) - assert.Len(t, cm, 1) + assert.Len(t, cm.values, 1) assert.Len(t, m.realtimeWorkflows["456"], 1) } diff --git a/workflow/metrics/work_queue.go b/workflow/metrics/work_queue.go index 312a858961f3..029d2575a1e6 100644 --- a/workflow/metrics/work_queue.go +++ b/workflow/metrics/work_queue.go @@ -52,9 +52,10 @@ func addWorkQueueMetrics(_ context.Context, m *Metrics) error { return err } unfinishedCallback := queueUserdata{ - gauge: m.AllInstruments[telemetry.InstrumentQueueUnfinishedWork.Name()]} - m.AllInstruments[telemetry.InstrumentQueueUnfinishedWork.Name()].SetUserdata(&unfinishedCallback) - err = m.AllInstruments[telemetry.InstrumentQueueUnfinishedWork.Name()].RegisterCallback(m.Metrics, unfinishedCallback.update) + gauge: m.GetInstrument(telemetry.InstrumentQueueUnfinishedWork.Name()), + } + unfinishedCallback.gauge.SetUserdata(&unfinishedCallback) + err = unfinishedCallback.gauge.RegisterCallback(m.Metrics, unfinishedCallback.update) if err != nil { return err } @@ -64,10 +65,10 @@ func addWorkQueueMetrics(_ context.Context, m *Metrics) error { return err } longestRunningCallback := queueUserdata{ - gauge: m.AllInstruments[telemetry.InstrumentQueueLongestRunning.Name()], + gauge: m.GetInstrument(telemetry.InstrumentQueueLongestRunning.Name()), } - m.AllInstruments[telemetry.InstrumentQueueLongestRunning.Name()].SetUserdata(&longestRunningCallback) - err = m.AllInstruments[telemetry.InstrumentQueueLongestRunning.Name()].RegisterCallback(m.Metrics, longestRunningCallback.update) + longestRunningCallback.gauge.SetUserdata(&longestRunningCallback) + err = longestRunningCallback.gauge.RegisterCallback(m.Metrics, longestRunningCallback.update) if err != nil { return err } @@ -78,7 +79,7 @@ func (m *Metrics) RateLimiterWithBusyWorkers(ctx context.Context, workQueue work queue := workersBusyRateLimiterWorkQueue{ TypedRateLimitingInterface: workqueue.NewTypedRateLimitingQueueWithConfig(workQueue, workqueue.TypedRateLimitingQueueConfig[string]{Name: queueName}), workerType: queueName, - busyGauge: m.AllInstruments[telemetry.InstrumentWorkersBusyCount.Name()], + busyGauge: m.GetInstrument(telemetry.InstrumentWorkersBusyCount.Name()), ctx: ctx, } queue.newWorker(ctx) @@ -167,7 +168,7 @@ func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric { return queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueDepthGauge.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueDepthGauge.Name()), } } @@ -175,7 +176,7 @@ func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric { return queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueAddsCount.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueAddsCount.Name()), } } @@ -183,7 +184,7 @@ func (m *Metrics) NewLatencyMetric(name string) workqueue.HistogramMetric { return queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueLatency.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueLatency.Name()), } } @@ -191,7 +192,7 @@ func (m *Metrics) NewWorkDurationMetric(name string) workqueue.HistogramMetric { return queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueDuration.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueDuration.Name()), } } @@ -199,7 +200,7 @@ func (m *Metrics) NewRetriesMetric(name string) workqueue.CounterMetric { return queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueRetries.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueRetries.Name()), } } @@ -207,7 +208,7 @@ func (m *Metrics) NewUnfinishedWorkSecondsMetric(name string) workqueue.Settable metric := queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueUnfinishedWork.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueUnfinishedWork.Name()), value: ptr.To(float64(0.0)), } ud := getQueueUserdata(metric.inst) @@ -219,7 +220,7 @@ func (m *Metrics) NewLongestRunningProcessorSecondsMetric(name string) workqueue metric := queueMetric{ ctx: m.Ctx, name: name, - inst: m.AllInstruments[telemetry.InstrumentQueueLongestRunning.Name()], + inst: m.GetInstrument(telemetry.InstrumentQueueLongestRunning.Name()), value: ptr.To(float64(0.0)), } ud := getQueueUserdata(metric.inst)