From c75388a3ac729498d330408a095c722f4b6b160a Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 25 Mar 2019 11:28:32 -0400 Subject: [PATCH] compact: accept malformed index (#953) * Allow malformed index patch Handle cases where we detect that postings have labels listed incorrectly due to Prometheus Isuue #5372. With a command line option set these specific errors can be ignored as they happen with Prometheus 2.8.0 and lesser versions. * Fix space in structured logging key * Fix tests to not ignore out of order label errors in postings * Skip verification of newly compacted block if allow malformed index The VerifyIndex() function explicitly states testing of the invariant ordering, so rather than adding additional parameters to change its behavior when --debug.accept-malformed-index is set, we skip the verification step on the newly compacted TSDB block. This allows compaction to happen as normal when out of order labels are present in the index. * PR feedback for acceptMalformedIndex * Use fields instead of function parameters * use the same variable name everywhere * acceptMalformedIndex: Fix tests reflecting field vs parameters * Route acceptMalformedIndex option via the Syncer * accept-malformed-index: PR feedback for comments and error msgs --- cmd/thanos/compact.go | 8 +++++++- pkg/block/index.go | 26 +++++++++++++++++++++++++- pkg/compact/compact.go | 15 +++++++++++++-- pkg/compact/compact_e2e_test.go | 5 +++-- 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index dd462d6b51..c95cf113d6 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected."). Hidden().Default("true").Bool() + acceptMalformedIndex := cmd.Flag("debug.accept-malformed-index", + "Compaction index verification will ignore out of order label names."). + Hidden().Default("false").Bool() httpAddr := regHTTPAddrFlag(cmd) @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig, time.Duration(*syncDelay), *haltOnError, + *acceptMalformedIndex, *wait, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), @@ -125,6 +129,7 @@ func runCompact( objStoreConfig *pathOrContent, syncDelay time.Duration, haltOnError bool, + acceptMalformedIndex bool, wait bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, @@ -162,7 +167,8 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency) + sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, + blockSyncConcurrency, acceptMalformedIndex) if err != nil { return errors.Wrap(err, "create syncer") } diff --git a/pkg/block/index.go b/pkg/block/index.go index 232bf40d37..777654f2e6 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -248,6 +249,20 @@ type Stats struct { // Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are // are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks. Issue347OutsideChunks int + // OutOfOrderLabels represents the number of postings that contained out + // of order labels, a bug present in Prometheus 2.8.0 and below. + OutOfOrderLabels int +} + +// PrometheusIssue5372Err returns an error if the Stats object indicates +// postings with out of order labels. This is corrected by Prometheus Issue +// #5372 and affects Prometheus versions 2.8.0 and below. +func (i Stats) PrometheusIssue5372Err() error { + if i.OutOfOrderLabels > 0 { + return errors.Errorf("index contains %d postings with out of order labels", + i.OutOfOrderLabels) + } + return nil } // Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block). @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error { errMsg = append(errMsg, err.Error()) } + if err := i.PrometheusIssue5372Err(); err != nil { + errMsg = append(errMsg, err.Error()) + } + if len(errMsg) > 0 { return errors.New(strings.Join(errMsg, ", ")) } @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime l0 := lset[0] for _, l := range lset[1:] { if l.Name < l0.Name { - return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id) + stats.OutOfOrderLabels++ + level.Warn(logger).Log("msg", + "out-of-order label set: known bug in Prometheus 2.8.0 and below", + "labelset", fmt.Sprintf("%s", lset), + "series", fmt.Sprintf("%d", id), + ) } l0 = l } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ca05adfb4a..6fd89b76e9 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -47,6 +47,7 @@ type Syncer struct { blocksMtx sync.Mutex blockSyncConcurrency int metrics *syncerMetrics + acceptMalformedIndex bool } type syncerMetrics struct { @@ -128,7 +129,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -140,6 +141,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket bkt: bkt, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, + acceptMalformedIndex: acceptMalformedIndex, }, nil } @@ -291,6 +293,7 @@ func (c *Syncer) Groups() (res []*Group, err error) { c.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, + c.acceptMalformedIndex, c.metrics.compactions.WithLabelValues(GroupKey(*m)), c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)), c.metrics.garbageCollectedBlocks, @@ -436,6 +439,7 @@ type Group struct { resolution int64 mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta + acceptMalformedIndex bool compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -447,6 +451,7 @@ func newGroup( bkt objstore.Bucket, lset labels.Labels, resolution int64, + acceptMalformedIndex bool, compactions prometheus.Counter, compactionFailures prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, @@ -460,6 +465,7 @@ func newGroup( labels: lset, resolution: resolution, blocks: map[ulid.ULID]*metadata.Meta{}, + acceptMalformedIndex: acceptMalformedIndex, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -769,6 +775,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := stats.Issue347OutsideChunksErr(); err != nil { return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } + + if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", id) + } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -816,7 +827,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { + if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 6f2194c41f..738048c5c2 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) testutil.Ok(t, err) // Generate 15 blocks. Initially the first 10 are synced into memory and only the last @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) testutil.Ok(t, err) testutil.Ok(t, sy.SyncMetas(ctx)) @@ -244,6 +244,7 @@ func TestGroup_Compact_e2e(t *testing.T) { bkt, extLset, 124, + false, metrics.compactions.WithLabelValues(""), metrics.compactionFailures.WithLabelValues(""), metrics.garbageCollectedBlocks,