Skip to content

Commit

Permalink
store: Added test for consistency delay filter, defaulted to 0. (#2159)
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Feb 20, 2020
1 parent 4061192 commit 021f623
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 6 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read.").
Default("30m"))
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
// Filter's label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
TooFreshMeta = "too-fresh"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
)

Expand Down Expand Up @@ -83,7 +83,7 @@ func newSyncMetrics(r prometheus.Registerer) *syncMetrics {
[]string{corruptedMeta},
[]string{noMeta},
[]string{loadedMeta},
[]string{TooFreshMeta},
[]string{tooFreshMeta},
[]string{failedMeta},
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
Expand Down Expand Up @@ -535,6 +535,9 @@ type ConsistencyDelayMetaFilter struct {

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter {
if logger == nil {
logger = log.NewNopLogger()
}
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
Expand All @@ -561,7 +564,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta,
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
synced.WithLabelValues(tooFreshMeta).Inc()
delete(metas, id)
}
}
Expand Down
109 changes: 108 additions & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
Expand Down Expand Up @@ -268,7 +270,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(f.metrics.synced.WithLabelValues(failedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(TooFreshMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(tooFreshMeta)))
}); !ok {
return
}
Expand Down Expand Up @@ -770,3 +772,108 @@ func compareSliceWithMapKeys(tb testing.TB, m map[ulid.ULID]*metadata.Meta, s []
tb.FailNow()
}
}

type ulidBuilder struct {
entropy *rand.Rand

created []ulid.ULID
}

func (u *ulidBuilder) ULID(t time.Time) ulid.ULID {
if u.entropy == nil {
source := rand.NewSource(1234)
u.entropy = rand.New(source)
}

id := ulid.MustNew(ulid.Timestamp(t), u.entropy)
u.created = append(u.created, id)
return id
}

func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
u := &ulidBuilder{}
now := time.Now()

input := map[ulid.ULID]*metadata.Meta{
// Fresh blocks.
u.ULID(now): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},

// For now non-delay delete sources, should be ignored by consistency delay.
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

// 29m.
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},

// For now non-delay delete sources, should be ignored by consistency delay.
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

// 30m.
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

// 30m+.
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},
}

t.Run("consistency 0 (turned off)", func(t *testing.T) {
synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
expected := map[ulid.ULID]*metadata.Meta{}
// Copy all.
for _, id := range u.created {
expected[id] = input[id]
}

reg := prometheus.NewRegistry()
f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

f.Filter(input, synced, false)

testutil.Equals(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})

t.Run("consistency 30m.", func(t *testing.T) {
synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
expected := map[ulid.ULID]*metadata.Meta{}
// Only certain sources and those with 30m or more age go through.
for i, id := range u.created {
// Younger than 30m.
if i < 13 {
if input[id].Thanos.Source != metadata.BucketRepairSource &&
input[id].Thanos.Source != metadata.CompactorSource &&
input[id].Thanos.Source != metadata.CompactorRepairSource {
continue
}
}
expected[id] = input[id]
}

reg := prometheus.NewRegistry()
f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

f.Filter(input, synced, false)

testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
}
35 changes: 35 additions & 0 deletions pkg/extprom/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extprom

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/testutil"
)

// CurrentGaugeValuesFor returns gauge values for given metric names. Useful for testing based on registry,
// when you don't have access to metric variable.
func CurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames ...string) map[string]float64 {
f, err := reg.Gather()
testutil.Ok(t, err)

res := make(map[string]float64, len(metricNames))
for _, g := range f {
for _, m := range metricNames {
if g.GetName() != m {
continue
}

testutil.Equals(t, 1, len(g.GetMetric()))
if _, ok := res[m]; ok {
t.Error("expected only one metric family for", m)
t.FailNow()
}
res[m] = *g.GetMetric()[0].GetGauge().Value
}
}
return res
}

0 comments on commit 021f623

Please sign in to comment.