Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add alert to notify about duplicate sample/metric ingestion
Browse files Browse the repository at this point in the history
This commit does the following,
1. Merge duplicate reporter into throughput reporter
2. Add alert about duplicate sample/metric ingestion
3. Add an e2e test to verify metrics related to duplicates are populated

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar committed Oct 11, 2022
1 parent db1463f commit 883b9ea
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use the following categories for changes:
- Added a vacuum engine that detects and vacuums/freezes compressed chunks [#1648]
- Add pool of database connections for maintenance jobs e.g. telemetry [#1657]
- Metrics for long-running statements and locks originating from maintenance jobs. [#1661]
- Add alert to notify about duplicate sample/metric ingestion. [#1688]

### Changed
- Log throughput in the same line for samples, spans and metric metadata [#1643]
Expand Down
10 changes: 10 additions & 0 deletions docs/mixin/alerts/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ groups:
summary: Slow Promscale ingestion.
description: "Slowest 10% of ingestion batch took more than {{ $value }} seconds to ingest."
runbook_url: https://github.com/timescale/promscale/blob/master/docs/runbooks/PromscaleIngestHighLatency.md
- alert: PromscaleIngestHighDataDuplication
expr: |
rate(promscale_ingest_duplicates_total{kind="sample"}[5m]) > 0
for: 5m
labels:
severity: warning
annotations:
summary: Duplicate data being inserted.
description: "More than {{ $value }} samples/sec are rejected as duplicates by promscale."
runbook_url: https://github.com/timescale/promscale/blob/master/docs/runbooks/PromscaleIngestHighDataDuplication.md
- name: promscale-query
rules:
- alert: PromscaleQueryHighErrorRate
Expand Down
38 changes: 38 additions & 0 deletions docs/runbooks/PromscaleIngestHighDataDuplication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# PromscaleIngestHighDataDuplication

## Meaning

Client payload has either duplicates or retrying many a time for the
data which has been already ingested.

## Impact

Ingestion performance will be poor

## Diagnosis

There will be a high volume of samples gets into Promscale when ingesting
from HA Prometheus. It can be examined by running the following PromQL
query in Grafana/Prometheus.

```promql
rate(promscale_ingest_duplicates_total{kind="sample"}[5m])
```

If more data points are seen as a result of the above query, follow
[Prometheus high availability](#prometheus-high-availability) to fix.

**Note**: `promscale_ingest_duplicates_total` is not yet implemented for
tracing.

## Mitigation

### Prometheus high availability

This could happen if the Prometheus HA deployment is not configured to
decorate the samples with the metadata from the replica that's pushing
the data. In this scenario, two or more Prometheus replicas from the same
cluster will be sending the exact same datapoints, and since there's no
cluster/replica metadata, Promscale doesn't have the information needed
to just accept the data from one of them and will try to persist them all.
Follow the guideline on running [Prometheus in HA mode](https://docs.timescale.com/promscale/latest/scale-ha/high-availability/#promscale-and-prometheus-high-availability) to fix the problem.
9 changes: 6 additions & 3 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/tracer"
tput "github.com/timescale/promscale/pkg/util/throughput"
)

const (
Expand Down Expand Up @@ -487,17 +488,19 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
metrics.IngestorRowsPerBatch.With(labelsCopier).Observe(float64(numRowsTotal))
metrics.IngestorInsertsPerBatch.With(labelsCopier).Observe(float64(len(reqs)))

var affectedMetrics uint64
var affectedMetrics, affectedSamples int64
for idx, numRows := range numRowsPerInsert {
if numRows != insertedRows[idx] {
affectedMetrics++
registerDuplicates(int64(numRows - insertedRows[idx]))
affectedSamples += int64(numRows - insertedRows[idx])
}
}
metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "sample"}).Add(float64(affectedSamples))
metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "metric"}).Add(float64(affectedMetrics))
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Add(float64(totalSamples))
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars))

reportDuplicates(affectedMetrics)
tput.ReportDuplicateMetrics(affectedSamples, affectedMetrics)
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds())
return nil, lowestMinTime
}
Expand Down
48 changes: 0 additions & 48 deletions pkg/pgmodel/ingestor/duplicates.go

This file was deleted.

30 changes: 30 additions & 0 deletions pkg/tests/end_to_end_tests/metrics_duplicate_insert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package end_to_end_tests

import (
"context"
"testing"

"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
ingstr "github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgxconn"
)

func TestMetricsDuplicateInsert(t *testing.T) {
ctx := context.Background()
ts := generateSmallTimeseries()
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
require.NoError(t, err)
defer ingestor.Close()
_, _, err = ingestor.IngestMetrics(ctx, newWriteRequestWithTs(copyMetrics(ts)))
require.NoError(t, err)
require.Zero(t, testutil.ToFloat64(metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "sample"})), "must be zero when no duplicates are ingested")
_, _, err = ingestor.IngestMetrics(ctx, newWriteRequestWithTs(copyMetrics(ts)))
require.NoError(t, err)
require.Greater(t, testutil.ToFloat64(metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "sample"})), 0.0, "duplicates insert must have occurred")
})
}
20 changes: 20 additions & 0 deletions pkg/util/throughput/throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type throughputCalc struct {
// Metrics telemetry.
samples *ewma.Rate
metadata *ewma.Rate
duplicateMetrics *ewma.Rate
duplicateSamples *ewma.Rate
metricsMaxSentTs int64

// Traces telemetry.
Expand All @@ -50,6 +52,8 @@ func newThroughputCal(every time.Duration) *throughputCalc {
metricsMaxSentTs: 0,
samples: ewma.NewEWMARate(1, every),
metadata: ewma.NewEWMARate(1, every),
duplicateMetrics: ewma.NewEWMARate(1, every),
duplicateSamples: ewma.NewEWMARate(1, every),
spans: ewma.NewEWMARate(1, every),
spansLastWriteOn: 0,
}
Expand All @@ -61,6 +65,8 @@ func (tc *throughputCalc) run() {
tc.samples.Tick()
tc.metadata.Tick()
tc.spans.Tick()
tc.duplicateSamples.Tick()
tc.duplicateMetrics.Tick()

samplesRate := tc.samples.Rate()
metadataRate := tc.metadata.Rate()
Expand All @@ -85,6 +91,12 @@ func (tc *throughputCalc) run() {
throughput = append(throughput, []interface{}{"metric-metadata/sec", int(metadataRate)}...)
}

duplicateSamplesRate := tc.duplicateSamples.Rate()
if duplicateSamplesRate != 0 {
duplicateMetricsRate := tc.duplicateMetrics.Rate()
throughput = append(throughput, []interface{}{"duplicate-metrics/sec", int(duplicateMetricsRate), "duplicate-samples/sec", int(duplicateSamplesRate)}...)
}

if len(throughput) > 2 {
// Log only if we had any activity.
log.Info(throughput...)
Expand Down Expand Up @@ -113,3 +125,11 @@ func ReportSpansProcessed(lastWriteTs int64, numSpans int) {
atomic.StoreInt64(&throughputWatcher.spansLastWriteOn, lastWriteTs)
}
}

func ReportDuplicateMetrics(numSamples, numMetrics int64) {
if throughputWatcher == nil {
return
}
throughputWatcher.duplicateMetrics.Incr(int64(numMetrics))
throughputWatcher.duplicateSamples.Incr(int64(numSamples))
}

0 comments on commit 883b9ea

Please sign in to comment.