Skip to content

Commit

Permalink
fix: correctly publish the hedgedmetrics counter with delta (#4078)
Browse files Browse the repository at this point in the history
* fix: correctly publish the hedgedmetrics counter with delta

* fix and update CHANGELOG.md

* fix datarace in the test

* replace locking with atomic

* remove locking and unnecessary code

Signed-off-by: Joe Elliott <[email protected]>

* i don't care

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
Co-authored-by: Joe Elliott <[email protected]>
  • Loading branch information
electron0zero and joe-elliott authored Sep 13, 2024
1 parent 8ae5b28 commit 85950ca
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* **BREAKING CHANGE** tempo-query is no longer a jaeger instance with grpcPlugin. Its now a standalone server. Serving a grpc api for jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
Expand Down Expand Up @@ -96,7 +97,6 @@
* [BUGFIX] Correct block end time when the ingested traces are outside the ingestion slack [#3954](https://github.com/grafana/tempo/pull/3954) (@javiermolinar)
* [BUGFIX] Fix race condition where a streaming response could be marshalled while being modified in the combiner resulting in a panic. [#3961](https://github.com/grafana/tempo/pull/3961) (@joe-elliott)
* [BUGFIX] Pass search options to the backend for SearchTagValuesBlocksV2 requests [#3971](https://github.com/grafana/tempo/pull/3971) (@javiermolinar)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) (@galalen)

## v2.5.0

Expand Down
37 changes: 30 additions & 7 deletions pkg/hedgedmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,40 @@ const (
hedgedMetricsPublishDuration = 10 * time.Second
)

// PublishHedgedMetrics flushes metrics from hedged requests every 10 seconds
type diffCounter struct {
previous uint64
counter prometheus.Counter
}

func (d *diffCounter) addAbsoluteToCounter(value uint64) {
diff := float64(value - d.previous)
if value < d.previous {
diff = float64(value)
}
d.counter.Add(diff)
d.previous = value
}

// statsProvider defines the interface that wraps hedgedhttp.Stats for ease of testing
type statsProvider interface {
Snapshot() hedgedhttp.StatsSnapshot
}

// Publish flushes metrics from hedged requests every tickerDur
func Publish(s *hedgedhttp.Stats, counter prometheus.Counter) {
ticker := time.NewTicker(hedgedMetricsPublishDuration)
publishWithDuration(s, counter, hedgedMetricsPublishDuration)
}

func publishWithDuration(s statsProvider, counter prometheus.Counter, duration time.Duration) {
ticker := time.NewTicker(duration)
diff := &diffCounter{previous: 0, counter: counter}

go func() {
for range ticker.C {
snap := s.Snapshot()
hedgedRequests := int64(snap.ActualRoundTrips) - int64(snap.RequestedRoundTrips)
if hedgedRequests < 0 {
hedgedRequests = 0
}
counter.Add(float64(hedgedRequests))

hedgedRequests := snap.ActualRoundTrips - snap.RequestedRoundTrips
diff.addAbsoluteToCounter(hedgedRequests)
}
}()
}
109 changes: 109 additions & 0 deletions pkg/hedgedmetrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package hedgedmetrics

import (
"sync"
"testing"
"time"

"github.com/cristalhq/hedgedhttp"
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestDiffCounter(t *testing.T) {
ctr := prometheus.NewCounter(prometheus.CounterOpts{Name: test.RandomString()})
dc := &diffCounter{previous: 0, counter: ctr}

dc.addAbsoluteToCounter(5)
require.Equal(t, 5.0, ctrVal(t, ctr))

dc.addAbsoluteToCounter(7)
require.Equal(t, 7.0, ctrVal(t, ctr))

dc.addAbsoluteToCounter(57)
require.Equal(t, 57.0, ctrVal(t, ctr))
}

/* Fails in CI. potentially due to an architectural difference and prom internals.
func TestDiffCounterOverflow(t *testing.T) {
ctr := prometheus.NewCounter(prometheus.CounterOpts{Name: test.RandomString()})
dc := &diffCounter{previous: 0, counter: ctr}
// start with something large
bigNum := math.MaxUint64 - uint64(1)
dc.addAbsoluteToCounter(bigNum)
require.Equal(t, float64(bigNum), ctrVal(t, ctr))
// then wrap the uint64s
dc.addAbsoluteToCounter(2)
require.Equal(t, 1.0, ctrVal(t, ctr)) // this is one off b/c of the internal counter wrap in prom. this could be corrected for but doesn't matter
dc.addAbsoluteToCounter(7)
require.Equal(t, 6.0, ctrVal(t, ctr))
}
*/

// MockStatsProvider is StatsProvider for testing
type MockStatsProvider struct {
mu sync.Mutex
actualRoundTrips uint64
requestedRoundTrips uint64
}

func (m *MockStatsProvider) Snapshot() hedgedhttp.StatsSnapshot {
m.mu.Lock()
defer m.mu.Unlock()
return hedgedhttp.StatsSnapshot{
ActualRoundTrips: m.actualRoundTrips,
RequestedRoundTrips: m.requestedRoundTrips,
}
}

func (m *MockStatsProvider) SetStats(actual, requested uint64) {
m.mu.Lock()
defer m.mu.Unlock()
m.actualRoundTrips = actual
m.requestedRoundTrips = requested
}

func TestPublish(t *testing.T) {
ctr := prometheus.NewCounter(prometheus.CounterOpts{Name: test.RandomString()})
stats := &MockStatsProvider{}

publishWithDuration(stats, ctr, 10*time.Millisecond)

require.Equal(t, 0.0, ctrVal(t, ctr))

// Set initial stats values
stats.SetStats(5, 5)
time.Sleep(30 * time.Millisecond)
require.Equal(t, 0.0, ctrVal(t, ctr))

stats.SetStats(15, 10)
time.Sleep(30 * time.Millisecond)
require.Equal(t, 5.0, ctrVal(t, ctr))

stats.SetStats(28, 20)
time.Sleep(30 * time.Millisecond)
require.Equal(t, 8.0, ctrVal(t, ctr))

stats.SetStats(38, 25)
time.Sleep(30 * time.Millisecond)
require.Equal(t, 13.0, ctrVal(t, ctr))

time.Sleep(30 * time.Millisecond)

// counter doesn't increase if stats stay same
time.Sleep(30 * time.Millisecond)
require.Equal(t, 13.0, ctrVal(t, ctr))
}

func ctrVal(t *testing.T, ctr prometheus.Counter) float64 {
t.Helper()

val, err := test.GetCounterValue(ctr)
require.NoError(t, err)

return val
}

0 comments on commit 85950ca

Please sign in to comment.