Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add alert to notify about duplicate sample/metric ingestion
Browse files Browse the repository at this point in the history
This commit does the following,
1. Merge duplicate reporter into throughput reporter
2. Add alert about duplicate sample/metric ingestion
3. Add an e2e test to verify metrics related to duplicates are populated

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar committed Oct 11, 2022
1 parent db1463f commit b851545
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 51 deletions.
12 changes: 12 additions & 0 deletions docs/mixin/alerts/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ groups:
summary: Slow Promscale ingestion.
description: "Slowest 10% of ingestion batch took more than {{ $value }} seconds to ingest."
runbook_url: https://github.com/timescale/promscale/blob/master/docs/runbooks/PromscaleIngestHighLatency.md
- alert: PromscaleIngestHighDataDuplication
expr: |
sum by(job, instance, type) (
rate(promscale_ingest_duplicates_total{kind="sample"}[5m])
) > 0
for: 5m
labels:
severity: critical
annotations:
summary: Duplicate data being inserted.
description: "More than {{ $value }} samples/sec are rejected as duplicates by promscale."
runbook_url: https://github.com/timescale/promscale/blob/master/docs/runbooks/PromscaleIngestHighDataDuplication.md
- name: promscale-query
rules:
- alert: PromscaleQueryHighErrorRate
Expand Down
19 changes: 19 additions & 0 deletions docs/runbooks/PromscaleIngestHighDataDuplication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# PromscaleIngestHighDataDuplication

## Meaning

Client payload has either duplicates or retrying many a time for the data which has been already ingested.


## Impact

Ingestion performance will be poor


## Diagnosis
1. If Prometheus is running in HA mode, go to [Prometheus high availability](#prometheus-high-availability)

## Mitigation

### Prometheus high availability
1. Follow the guideline on [running Prometheus in HA mode](https://docs.timescale.com/promscale/latest/scale-ha/high-availability/#promscale-and-prometheus-high-availability).
9 changes: 6 additions & 3 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/tracer"
tput "github.com/timescale/promscale/pkg/util/throughput"
)

const (
Expand Down Expand Up @@ -487,17 +488,19 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
metrics.IngestorRowsPerBatch.With(labelsCopier).Observe(float64(numRowsTotal))
metrics.IngestorInsertsPerBatch.With(labelsCopier).Observe(float64(len(reqs)))

var affectedMetrics uint64
var affectedMetrics, affectedSamples int64
for idx, numRows := range numRowsPerInsert {
if numRows != insertedRows[idx] {
affectedMetrics++
registerDuplicates(int64(numRows - insertedRows[idx]))
affectedSamples += int64(numRows - insertedRows[idx])
}
}
metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "sample"}).Add(float64(affectedSamples))
metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "metric"}).Add(float64(affectedMetrics))
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Add(float64(totalSamples))
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars))

reportDuplicates(affectedMetrics)
tput.ReportDuplicateMetrics(affectedSamples, affectedMetrics)
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds())
return nil, lowestMinTime
}
Expand Down
48 changes: 0 additions & 48 deletions pkg/pgmodel/ingestor/duplicates.go

This file was deleted.

30 changes: 30 additions & 0 deletions pkg/tests/end_to_end_tests/metrics_duplicate_insert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package end_to_end_tests

import (
"context"
"testing"

"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
ingstr "github.com/timescale/promscale/pkg/pgmodel/ingestor"
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgxconn"
)

func TestMetricsDuplicateInsert(t *testing.T) {
ctx := context.Background()
ts := generateSmallTimeseries()
withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) {
ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil)
require.NoError(t, err)
defer ingestor.Close()
_, _, err = ingestor.IngestMetrics(ctx, newWriteRequestWithTs(copyMetrics(ts)))
require.NoError(t, err)
require.Zero(t, testutil.ToFloat64(metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "sample"})), "must be zero when no duplicates are ingested")
_, _, err = ingestor.IngestMetrics(ctx, newWriteRequestWithTs(copyMetrics(ts)))
require.NoError(t, err)
require.Greater(t, testutil.ToFloat64(metrics.IngestorDuplicates.With(prometheus.Labels{"type": "metric", "kind": "sample"})), 0.0, "duplicates insert must have occurred")
})
}
20 changes: 20 additions & 0 deletions pkg/util/throughput/throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type throughputCalc struct {
// Metrics telemetry.
samples *ewma.Rate
metadata *ewma.Rate
duplicateMetrics *ewma.Rate
duplicateSamples *ewma.Rate
metricsMaxSentTs int64

// Traces telemetry.
Expand All @@ -50,6 +52,8 @@ func newThroughputCal(every time.Duration) *throughputCalc {
metricsMaxSentTs: 0,
samples: ewma.NewEWMARate(1, every),
metadata: ewma.NewEWMARate(1, every),
duplicateMetrics: ewma.NewEWMARate(1, every),
duplicateSamples: ewma.NewEWMARate(1, every),
spans: ewma.NewEWMARate(1, every),
spansLastWriteOn: 0,
}
Expand All @@ -61,6 +65,8 @@ func (tc *throughputCalc) run() {
tc.samples.Tick()
tc.metadata.Tick()
tc.spans.Tick()
tc.duplicateSamples.Tick()
tc.duplicateMetrics.Tick()

samplesRate := tc.samples.Rate()
metadataRate := tc.metadata.Rate()
Expand All @@ -85,6 +91,12 @@ func (tc *throughputCalc) run() {
throughput = append(throughput, []interface{}{"metric-metadata/sec", int(metadataRate)}...)
}

duplicateSamplesRate := tc.duplicateSamples.Rate()
if duplicateSamplesRate != 0 {
duplicateMetricsRate := tc.duplicateMetrics.Rate()
throughput = append(throughput, []interface{}{"duplicate-metrics/sec", int(duplicateMetricsRate), "duplicate-samples/sec", int(duplicateSamplesRate)}...)
}

if len(throughput) > 2 {
// Log only if we had any activity.
log.Info(throughput...)
Expand Down Expand Up @@ -113,3 +125,11 @@ func ReportSpansProcessed(lastWriteTs int64, numSpans int) {
atomic.StoreInt64(&throughputWatcher.spansLastWriteOn, lastWriteTs)
}
}

func ReportDuplicateMetrics(numSamples, numMetrics int64) {
if throughputWatcher == nil {
return
}
throughputWatcher.duplicateMetrics.Incr(int64(numMetrics))
throughputWatcher.duplicateSamples.Incr(int64(numSamples))
}

0 comments on commit b851545

Please sign in to comment.