diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index dd462d6b51..c95cf113d6 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected."). Hidden().Default("true").Bool() + acceptMalformedIndex := cmd.Flag("debug.accept-malformed-index", + "Compaction index verification will ignore out of order label names."). + Hidden().Default("false").Bool() httpAddr := regHTTPAddrFlag(cmd) @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig, time.Duration(*syncDelay), *haltOnError, + *acceptMalformedIndex, *wait, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), @@ -125,6 +129,7 @@ func runCompact( objStoreConfig *pathOrContent, syncDelay time.Duration, haltOnError bool, + acceptMalformedIndex bool, wait bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, @@ -162,7 +167,8 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency) + sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, + blockSyncConcurrency, acceptMalformedIndex) if err != nil { return errors.Wrap(err, "create syncer") } diff --git a/pkg/block/index.go b/pkg/block/index.go index 232bf40d37..777654f2e6 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -248,6 +249,20 @@ type Stats struct { // Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are // are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks. Issue347OutsideChunks int + // OutOfOrderLabels represents the number of postings that contained out + // of order labels, a bug present in Prometheus 2.8.0 and below. + OutOfOrderLabels int +} + +// PrometheusIssue5372Err returns an error if the Stats object indicates +// postings with out of order labels. This is corrected by Prometheus Issue +// #5372 and affects Prometheus versions 2.8.0 and below. +func (i Stats) PrometheusIssue5372Err() error { + if i.OutOfOrderLabels > 0 { + return errors.Errorf("index contains %d postings with out of order labels", + i.OutOfOrderLabels) + } + return nil } // Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block). @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error { errMsg = append(errMsg, err.Error()) } + if err := i.PrometheusIssue5372Err(); err != nil { + errMsg = append(errMsg, err.Error()) + } + if len(errMsg) > 0 { return errors.New(strings.Join(errMsg, ", ")) } @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime l0 := lset[0] for _, l := range lset[1:] { if l.Name < l0.Name { - return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id) + stats.OutOfOrderLabels++ + level.Warn(logger).Log("msg", + "out-of-order label set: known bug in Prometheus 2.8.0 and below", + "labelset", fmt.Sprintf("%s", lset), + "series", fmt.Sprintf("%d", id), + ) } l0 = l } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ca05adfb4a..6fd89b76e9 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -47,6 +47,7 @@ type Syncer struct { blocksMtx sync.Mutex blockSyncConcurrency int metrics *syncerMetrics + acceptMalformedIndex bool } type syncerMetrics struct { @@ -128,7 +129,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -140,6 +141,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket bkt: bkt, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, + acceptMalformedIndex: acceptMalformedIndex, }, nil } @@ -291,6 +293,7 @@ func (c *Syncer) Groups() (res []*Group, err error) { c.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, + c.acceptMalformedIndex, c.metrics.compactions.WithLabelValues(GroupKey(*m)), c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)), c.metrics.garbageCollectedBlocks, @@ -436,6 +439,7 @@ type Group struct { resolution int64 mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta + acceptMalformedIndex bool compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -447,6 +451,7 @@ func newGroup( bkt objstore.Bucket, lset labels.Labels, resolution int64, + acceptMalformedIndex bool, compactions prometheus.Counter, compactionFailures prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, @@ -460,6 +465,7 @@ func newGroup( labels: lset, resolution: resolution, blocks: map[ulid.ULID]*metadata.Meta{}, + acceptMalformedIndex: acceptMalformedIndex, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -769,6 +775,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := stats.Issue347OutsideChunksErr(); err != nil { return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } + + if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", id) + } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -816,7 +827,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { + if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 6f2194c41f..738048c5c2 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) testutil.Ok(t, err) // Generate 15 blocks. Initially the first 10 are synced into memory and only the last @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) testutil.Ok(t, err) testutil.Ok(t, sy.SyncMetas(ctx)) @@ -244,6 +244,7 @@ func TestGroup_Compact_e2e(t *testing.T) { bkt, extLset, 124, + false, metrics.compactions.WithLabelValues(""), metrics.compactionFailures.WithLabelValues(""), metrics.garbageCollectedBlocks,