From e667a8f9eda5ef8b74ec0d6e7dfca5a1e60f9ce6 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Thu, 9 Jan 2025 21:02:38 -0500 Subject: [PATCH] :bug: Fix a bug in the priorityqueue metrics The priorityqueue needs to call `metrics.add` but only once an item was ready. When an item was initially added with `After`, so not ready, then without which makes it ready, we didn't do that, resulting in potentially negative values for the queue depth metric. --- pkg/controller/priorityqueue/metrics.go | 10 ++------ pkg/controller/priorityqueue/priorityqueue.go | 3 +++ .../priorityqueue/priorityqueue_test.go | 25 ++++++++++++++++++- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index f3ac226eea..f6a2697a65 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -4,7 +4,6 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" ) @@ -34,7 +33,6 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl workDuration: mp.NewWorkDurationMetric(name), unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), - added: sets.Set[T]{}, addTimes: map[T]time.Time{}, processingStartTimes: map[T]time.Time{}, retries: mp.NewRetriesMetric(name), @@ -55,7 +53,6 @@ type defaultQueueMetrics[T comparable] struct { workDuration workqueue.HistogramMetric mapLock sync.RWMutex - added sets.Set[T] addTimes map[T]time.Time processingStartTimes map[T]time.Time @@ -73,13 +70,11 @@ func (m *defaultQueueMetrics[T]) add(item T) { } m.adds.Inc() + m.depth.Inc() m.mapLock.Lock() defer m.mapLock.Unlock() - if !m.added.Has(item) { - m.added.Insert(item) - m.depth.Inc() - } + if _, exists := m.addTimes[item]; !exists { m.addTimes[item] = m.clock.Now() } @@ -94,7 +89,6 @@ func (m *defaultQueueMetrics[T]) get(item T) { defer m.mapLock.Unlock() m.depth.Dec() - m.added.Delete(item) m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index a2e80d3065..2b3a8904d7 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -163,6 +163,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { + if readyAt == nil { + w.metrics.add(key) + } item.readyAt = readyAt } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index d272e42e2c..e431c993fb 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -379,6 +379,23 @@ var _ = Describe("Controllerworkqueue", func() { wg.Wait() } }) + + It("updates metrics correctly for an item that gets initially added with after and then without", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: time.Hour}, "foo") + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() + + q.AddWithOpts(AddOpts{}, "foo") + + Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(1)) + }) }) func BenchmarkAddGetDone(b *testing.B) { @@ -454,7 +471,7 @@ func TestFuzzPrioriorityQueue(t *testing.T) { handedOutLock := sync.Mutex{} wg := sync.WaitGroup{} - q, _ := newQueue() + q, metrics := newQueue() for range 10 { wg.Add(1) @@ -519,6 +536,12 @@ func TestFuzzPrioriorityQueue(t *testing.T) { if handedOut.Has(item) { t.Errorf("item %s got handed out more than once", item) } + + metrics.mu.Lock() + if metrics.depth["test"] < 0 { + t.Errorf("negative depth of %d", metrics.depth["test"]) + } + metrics.mu.Unlock() handedOut.Insert(item) }()