Skip to content

Commit

Permalink
🐛 Fix a bug in the priorityqueue metrics
Browse files Browse the repository at this point in the history
The priorityqueue needs to call `metrics.add` but only once an item was
ready. When an item was initially added with `After`, so not ready, then
without which makes it ready, we didn't do that, resulting in
potentially negative values for the queue depth metric.
  • Loading branch information
alvaroaleman committed Jan 10, 2025
1 parent 1de5a3e commit e667a8f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
10 changes: 2 additions & 8 deletions pkg/controller/priorityqueue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
Expand Down Expand Up @@ -34,7 +33,6 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
added: sets.Set[T]{},
addTimes: map[T]time.Time{},
processingStartTimes: map[T]time.Time{},
retries: mp.NewRetriesMetric(name),
Expand All @@ -55,7 +53,6 @@ type defaultQueueMetrics[T comparable] struct {
workDuration workqueue.HistogramMetric

mapLock sync.RWMutex
added sets.Set[T]
addTimes map[T]time.Time
processingStartTimes map[T]time.Time

Expand All @@ -73,13 +70,11 @@ func (m *defaultQueueMetrics[T]) add(item T) {
}

m.adds.Inc()
m.depth.Inc()

m.mapLock.Lock()
defer m.mapLock.Unlock()
if !m.added.Has(item) {
m.added.Insert(item)
m.depth.Inc()
}

if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now()
}
Expand All @@ -94,7 +89,6 @@ func (m *defaultQueueMetrics[T]) get(item T) {
defer m.mapLock.Unlock()

m.depth.Dec()
m.added.Delete(item)

m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
}

if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
if readyAt == nil {
w.metrics.add(key)
}
item.readyAt = readyAt
}

Expand Down
25 changes: 24 additions & 1 deletion pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,23 @@ var _ = Describe("Controllerworkqueue", func() {
wg.Wait()
}
})

It("updates metrics correctly for an item that gets initially added with after and then without", func() {
q, metrics := newQueue()
defer q.ShutDown()

q.AddWithOpts(AddOpts{After: time.Hour}, "foo")
Expect(q.Len()).To(Equal(0))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(0))
metrics.mu.Unlock()

q.AddWithOpts(AddOpts{}, "foo")

Expect(q.Len()).To(Equal(1))
metrics.mu.Lock()
Expect(metrics.depth["test"]).To(Equal(1))
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down Expand Up @@ -454,7 +471,7 @@ func TestFuzzPrioriorityQueue(t *testing.T) {
handedOutLock := sync.Mutex{}

wg := sync.WaitGroup{}
q, _ := newQueue()
q, metrics := newQueue()

for range 10 {
wg.Add(1)
Expand Down Expand Up @@ -519,6 +536,12 @@ func TestFuzzPrioriorityQueue(t *testing.T) {
if handedOut.Has(item) {
t.Errorf("item %s got handed out more than once", item)
}

metrics.mu.Lock()
if metrics.depth["test"] < 0 {
t.Errorf("negative depth of %d", metrics.depth["test"])
}
metrics.mu.Unlock()
handedOut.Insert(item)
}()

Expand Down

0 comments on commit e667a8f

Please sign in to comment.