Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Added test for consistency delay filter, defaulted to 0. #2159

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read.").
Default("30m"))
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
// Filter's label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
TooFreshMeta = "too-fresh"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
)

Expand Down Expand Up @@ -83,7 +83,7 @@ func newSyncMetrics(r prometheus.Registerer) *syncMetrics {
[]string{corruptedMeta},
[]string{noMeta},
[]string{loadedMeta},
[]string{TooFreshMeta},
[]string{tooFreshMeta},
[]string{failedMeta},
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
Expand Down Expand Up @@ -535,6 +535,9 @@ type ConsistencyDelayMetaFilter struct {

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter {
if logger == nil {
logger = log.NewNopLogger()
}
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
Expand All @@ -561,7 +564,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta,
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
synced.WithLabelValues(tooFreshMeta).Inc()
delete(metas, id)
}
}
Expand Down
109 changes: 108 additions & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
Expand Down Expand Up @@ -268,7 +270,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(f.metrics.synced.WithLabelValues(failedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(TooFreshMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(tooFreshMeta)))
}); !ok {
return
}
Expand Down Expand Up @@ -770,3 +772,108 @@ func compareSliceWithMapKeys(tb testing.TB, m map[ulid.ULID]*metadata.Meta, s []
tb.FailNow()
}
}

type ulidBuilder struct {
entropy *rand.Rand

created []ulid.ULID
}

func (u *ulidBuilder) ULID(t time.Time) ulid.ULID {
if u.entropy == nil {
source := rand.NewSource(1234)
u.entropy = rand.New(source)
}

id := ulid.MustNew(ulid.Timestamp(t), u.entropy)
u.created = append(u.created, id)
return id
}

func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
u := &ulidBuilder{}
now := time.Now()

input := map[ulid.ULID]*metadata.Meta{
// Fresh blocks.
u.ULID(now): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},

// For now non-delay delete sources, should be ignored by consistency delay.
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-1 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

// 29m.
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},

// For now non-delay delete sources, should be ignored by consistency delay.
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-29 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

// 30m.
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-30 * time.Minute)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

// 30m+.
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},
}

t.Run("consistency 0 (turned off)", func(t *testing.T) {
synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
expected := map[ulid.ULID]*metadata.Meta{}
// Copy all.
for _, id := range u.created {
expected[id] = input[id]
}

reg := prometheus.NewRegistry()
f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

f.Filter(input, synced, false)

testutil.Equals(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})

t.Run("consistency 30m.", func(t *testing.T) {
synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
expected := map[ulid.ULID]*metadata.Meta{}
// Only certain sources and those with 30m or more age go through.
for i, id := range u.created {
// Younger than 30m.
if i < 13 {
if input[id].Thanos.Source != metadata.BucketRepairSource &&
input[id].Thanos.Source != metadata.CompactorSource &&
input[id].Thanos.Source != metadata.CompactorRepairSource {
continue
}
}
expected[id] = input[id]
}

reg := prometheus.NewRegistry()
f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

f.Filter(input, synced, false)

testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
}
35 changes: 35 additions & 0 deletions pkg/extprom/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extprom

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/testutil"
)

// CurrentGaugeValuesFor returns gauge values for given metric names. Useful for testing based on registry,
// when you don't have access to metric variable.
func CurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames ...string) map[string]float64 {
f, err := reg.Gather()
testutil.Ok(t, err)

res := make(map[string]float64, len(metricNames))
for _, g := range f {
for _, m := range metricNames {
if g.GetName() != m {
continue
}

testutil.Equals(t, 1, len(g.GetMetric()))
if _, ok := res[m]; ok {
t.Error("expected only one metric family for", m)
t.FailNow()
}
res[m] = *g.GetMetric()[0].GetGauge().Value
}
}
return res
}