Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: counter samples being downsampled when series is new #4236

Merged
merged 3 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
## main / unreleased
* [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210)
* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137)
* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero)

* [CHANGE] Add throughput and SLO metrics in the tags and tag values endpoints [#4148](https://github.com/grafana/tempo/pull/4148) (@electron0zero)
* [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137]
* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero)
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions.
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
* [CHANGE] **BREAKING CHANGE** Migrate from OpenTracing to OpenTelemetry instrumentation. Removed the `use_otel_tracer` configuration option. Use the OpenTelemetry environment variables to configure the span exporter [#3646](https://github.com/grafana/tempo/pull/3646) (@andreasgerstmayr)
Expand All @@ -41,6 +30,19 @@
* [ENHANCEMENT] TraceQL metrics queries: add max_over_time [#4065](https://github.com/grafana/tempo/pull/4065) (@javiermolinar)
* [ENHANCEMENT] Write tenantindex as proto and json with a prefernce for proto [#4072](https://github.com/grafana/tempo/pull/4072) (@zalegrala)
* [ENHANCEMENT] Pool zstd encoding/decoding for tmepodb/backend [#4208](https://github.com/grafana/tempo/pull/4208) (@zalegrala)
* [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210)
* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137)
* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero)
* [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137]
* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#44236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)

# v2.6.1

Expand Down
4 changes: 2 additions & 2 deletions modules/generator/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ type Config struct {
// Defaults to 15s.
CollectionInterval time.Duration `yaml:"collection_interval"`

// StaleDuration controls how quickly series become stale and are deleted from the registry. An active
// series is deleted if it hasn't been updated more stale duration.
// StaleDuration controls how quickly series become stale and are deleted from the registry.
// An active series is deleted if it hasn't been updated for a more than the stale duration.
// Defaults to 15m.
StaleDuration time.Duration `yaml:"stale_duration"`

Expand Down
16 changes: 5 additions & 11 deletions modules/generator/registry/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"go.uber.org/atomic"
)

var _ metric = (*counter)(nil)

type counter struct {
//nolint unused
metric
Expand Down Expand Up @@ -40,8 +38,6 @@ var (
_ metric = (*counter)(nil)
)

const insertOffsetDuration = 1 * time.Second

func (co *counterSeries) isNew() bool {
return co.firstSeries.Load()
}
Expand Down Expand Up @@ -144,9 +140,6 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
lb := labels.NewBuilder(baseLabels)

for _, s := range c.series {
t := time.UnixMilli(timeMs)

// reset labels for every series
lb.Reset(baseLabels)

// set series-specific labels
Expand All @@ -158,16 +151,17 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
// to first insert a 0 value to allow Prometheus to start from a non-null
// value.
if s.isNew() {
_, err = appender.Append(0, lb.Labels(), timeMs, 0)
// We set the timestamp of the init serie at the end of the previous minute, that way we ensure it ends in a
// different aggregation interval to avoid be downsampled.
endOfLastMinuteMs := getEndOfLastMinuteMs(timeMs)
_, err = appender.Append(0, lb.Labels(), endOfLastMinuteMs, 0)
if err != nil {
return
}
// Increment timeMs to ensure that the next value is not at the same time.
t = t.Add(insertOffsetDuration)
s.registerSeenSeries()
}

_, err = appender.Append(0, lb.Labels(), t.UnixMilli(), s.value.Load())
_, err = appender.Append(0, lb.Labels(), timeMs, s.value.Load())
if err != nil {
return
}
Expand Down
66 changes: 33 additions & 33 deletions modules/generator/registry/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func Test_counter(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
onAdd := func(_ uint32) bool {
seriesAdded++
return true
}
Expand All @@ -26,12 +26,12 @@ func Test_counter(t *testing.T) {
assert.Equal(t, 2, seriesAdded)

collectionTimeMs := time.Now().UnixMilli()
offsetCollectionTimeMs := time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs := getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, offsetCollectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, offsetCollectionTimeMs, 2),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)

Expand All @@ -41,20 +41,20 @@ func Test_counter(t *testing.T) {
assert.Equal(t, 3, seriesAdded)

collectionTimeMs = time.Now().UnixMilli()
offsetCollectionTimeMs = time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs = getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples = []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 4),
newSample(map[string]string{"__name__": "my_counter", "label": "value-3"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-3"}, offsetCollectionTimeMs, 3),
newSample(map[string]string{"__name__": "my_counter", "label": "value-3"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-3"}, collectionTimeMs, 3),
}

collectMetricAndAssert(t, c, collectionTimeMs, nil, 3, expectedSamples, nil)
}

func TestCounterDifferentLabels(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
onAdd := func(_ uint32) bool {
seriesAdded++
return true
}
Expand All @@ -67,12 +67,12 @@ func TestCounterDifferentLabels(t *testing.T) {
assert.Equal(t, 2, seriesAdded)

collectionTimeMs := time.Now().UnixMilli()
offsetCollectionTimeMs := time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs := getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, offsetCollectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "another_label": "another_value"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "another_label": "another_value"}, offsetCollectionTimeMs, 2),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "another_label": "another_value"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "another_label": "another_value"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
}
Expand All @@ -93,12 +93,12 @@ func Test_counter_cantAdd(t *testing.T) {
c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.0)

collectionTimeMs := time.Now().UnixMilli()
offsetCollectionTimeMs := time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs := getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, offsetCollectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, offsetCollectionTimeMs, 2),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)

Expand Down Expand Up @@ -134,12 +134,12 @@ func Test_counter_removeStaleSeries(t *testing.T) {
assert.Equal(t, 0, removedSeries)

collectionTimeMs := time.Now().UnixMilli()
offsetCollectionTimeMs := time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs := getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, offsetCollectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, offsetCollectionTimeMs, 2),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)

Expand Down Expand Up @@ -167,12 +167,12 @@ func Test_counter_externalLabels(t *testing.T) {
c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.0)

collectionTimeMs := time.Now().UnixMilli()
offsetCollectionTimeMs := time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs := getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1", "external_label": "external_value"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1", "external_label": "external_value"}, offsetCollectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, offsetCollectionTimeMs, 2),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1", "external_label": "external_value"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, map[string]string{"external_label": "external_value"}, 2, expectedSamples, nil)
}
Expand Down Expand Up @@ -253,10 +253,10 @@ func Test_counter_concurrencyCorrectness(t *testing.T) {
wg.Wait()

collectionTimeMs := time.Now().UnixMilli()
offsetCollectionTimeMs := time.UnixMilli(collectionTimeMs).Add(insertOffsetDuration).UnixMilli()
endOfLastMinuteMs := getEndOfLastMinuteMs(collectionTimeMs)
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, offsetCollectionTimeMs, totalCount.Load()),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, totalCount.Load()),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples, nil)
}
Expand Down
6 changes: 3 additions & 3 deletions modules/generator/registry/gauge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func Test_gaugeInc(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
onAdd := func(_ uint32) bool {
seriesAdded++
return true
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func Test_gaugeInc(t *testing.T) {

func TestGaugeDifferentLabels(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
onAdd := func(_ uint32) bool {
seriesAdded++
return true
}
Expand All @@ -69,7 +69,7 @@ func TestGaugeDifferentLabels(t *testing.T) {

func Test_gaugeSet(t *testing.T) {
var seriesAdded int
onAdd := func(count uint32) bool {
onAdd := func(_ uint32) bool {
seriesAdded++
return true
}
Expand Down
15 changes: 8 additions & 7 deletions modules/generator/registry/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
h.seriesMtx.Lock()
defer h.seriesMtx.Unlock()

t := timeMs

activeSeries = len(h.series) * int(h.activeSeriesPerHistogramSerie())

labelsCount := 0
Expand All @@ -191,7 +189,10 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
// we need to first insert a 0 value to allow Prometheus to start from a non-null value.
if s.isNew() {
lb.Set(labels.MetricName, h.nameCount)
_, err = appender.Append(0, lb.Labels(), t-1, 0) // t-1 to ensure that the next value is not at the same time
// We set the timestamp of the init serie at the end of the previous minute, that way we ensure it ends in a
// different aggregation interval to avoid be downsampled.
endOfLastMinuteMs := getEndOfLastMinuteMs(timeMs)
_, err = appender.Append(0, lb.Labels(), endOfLastMinuteMs, 0)
if err != nil {
return
}
Expand All @@ -200,14 +201,14 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte

// sum
lb.Set(labels.MetricName, h.nameSum)
_, err = appender.Append(0, lb.Labels(), t, s.sum.Load())
_, err = appender.Append(0, lb.Labels(), timeMs, s.sum.Load())
if err != nil {
return
}

// count
lb.Set(labels.MetricName, h.nameCount)
_, err = appender.Append(0, lb.Labels(), t, s.count.Load())
_, err = appender.Append(0, lb.Labels(), timeMs, s.count.Load())
if err != nil {
return
}
Expand All @@ -217,7 +218,7 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte

for i, bucketLabel := range h.bucketLabels {
lb.Set(labels.BucketLabel, bucketLabel)
ref, err := appender.Append(0, lb.Labels(), t, s.buckets[i].Load())
ref, err := appender.Append(0, lb.Labels(), timeMs, s.buckets[i].Load())
if err != nil {
return activeSeries, err
}
Expand All @@ -230,7 +231,7 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
Value: ex,
}},
Value: s.exemplarValues[i].Load(),
Ts: t,
Ts: timeMs,
})
if err != nil {
return activeSeries, err
Expand Down
Loading