From e7aa8ee7a183606b133e36f9833c1c41834ee4ea Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Tue, 19 Mar 2019 17:07:36 -0400 Subject: [PATCH 1/8] Allow malformed index patch Handle cases where we detect that postings have labels listed incorrectly due to Prometheus Isuue #5372. With a command line option set these specific errors can be ignored as they happen with Prometheus 2.8.0 and lesser versions. --- cmd/thanos/compact.go | 7 ++++++- pkg/block/index.go | 26 +++++++++++++++++++++++++- pkg/compact/compact.go | 14 +++++++++----- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index dd462d6b51..ccc7923231 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected."). Hidden().Default("true").Bool() + acceptMalformed := cmd.Flag("debug.accept-malformed-index", + "Compaction index verification will ignore out of order label names."). + Hidden().Default("false").Bool() httpAddr := regHTTPAddrFlag(cmd) @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig, time.Duration(*syncDelay), *haltOnError, + *acceptMalformed, *wait, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), @@ -125,6 +129,7 @@ func runCompact( objStoreConfig *pathOrContent, syncDelay time.Duration, haltOnError bool, + acceptMalformed bool, wait bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, @@ -206,7 +211,7 @@ func runCompact( ctx, cancel := context.WithCancel(context.Background()) f := func() error { - if err := compactor.Compact(ctx); err != nil { + if err := compactor.Compact(ctx, acceptMalformed); err != nil { return errors.Wrap(err, "compaction failed") } level.Info(logger).Log("msg", "compaction iterations done") diff --git a/pkg/block/index.go b/pkg/block/index.go index 232bf40d37..8776856307 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -248,6 +249,20 @@ type Stats struct { // Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are // are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks. Issue347OutsideChunks int + // OutOfOrderLabels represents the number of postings that contained out + // of order labels, a bug present in Prometheus 2.8.0 and below. + OutOfOrderLabels int +} + +// PrometheusIssue5372Err returns an error if statsd indicates postings with out +// of order labels. This is introduced by Prometheus Issue #5372 in 2.8.0 and +// below. +func (i Stats) PrometheusIssue5372Err() error { + if i.OutOfOrderLabels > 0 { + return errors.Errorf("index contains %d postings with out of order labels", + i.OutOfOrderLabels) + } + return nil } // Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block). @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error { errMsg = append(errMsg, err.Error()) } + if err := i.PrometheusIssue5372Err(); err != nil { + errMsg = append(errMsg, err.Error()) + } + if len(errMsg) > 0 { return errors.New(strings.Join(errMsg, ", ")) } @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime l0 := lset[0] for _, l := range lset[1:] { if l.Name < l0.Name { - return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id) + stats.OutOfOrderLabels++ + level.Warn(logger).Log("msg", + "out-of-order label set: known bug in Prometheus 2.8.0 and below", + "label set", fmt.Sprintf("%s", lset), + "series", fmt.Sprintf("%d", id), + ) } l0 = l } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ca05adfb4a..da5a0470bf 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -513,7 +513,7 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) { +func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor, ignoreMalformedIndex bool) (bool, ulid.ULID, error) { subDir := filepath.Join(dir, cg.Key()) if err := os.RemoveAll(subDir); err != nil { @@ -523,7 +523,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - shouldRerun, compID, err := cg.compact(ctx, subDir, comp) + shouldRerun, compID, err := cg.compact(ctx, subDir, comp, ignoreMalformedIndex) if err != nil { cg.compactionFailures.Inc() } @@ -688,7 +688,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) { +func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor, ignoreMalformed bool) (shouldRerun bool, compID ulid.ULID, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -769,6 +769,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := stats.Issue347OutsideChunksErr(); err != nil { return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } + + if err := stats.PrometheusIssue5372Err(); !ignoreMalformed && err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "block id %s", id) + } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -886,7 +890,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp } // Compact runs compaction over bucket. -func (c *BucketCompactor) Compact(ctx context.Context) error { +func (c *BucketCompactor) Compact(ctx context.Context, IgnoreMalformedIndex bool) error { // Loop over bucket and compact until there's no work left. for { // Clean up the compaction temporary directory at the beginning of every compaction loop. @@ -914,7 +918,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } finishedAllGroups := true for _, g := range groups { - shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) + shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp, IgnoreMalformedIndex) if err == nil { if shouldRerunGroup { finishedAllGroups = false From f4b07c806755605e231f30a02af645578092ba4a Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Wed, 20 Mar 2019 16:22:43 -0400 Subject: [PATCH 2/8] Fix space in structured logging key --- pkg/block/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index 8776856307..8bccf1f800 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -370,7 +370,7 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime stats.OutOfOrderLabels++ level.Warn(logger).Log("msg", "out-of-order label set: known bug in Prometheus 2.8.0 and below", - "label set", fmt.Sprintf("%s", lset), + "labelset", fmt.Sprintf("%s", lset), "series", fmt.Sprintf("%d", id), ) } From 6011c72740a97790e659784aff26a650517a3e46 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Wed, 20 Mar 2019 17:24:56 -0400 Subject: [PATCH 3/8] Fix tests to not ignore out of order label errors in postings --- pkg/compact/compact_e2e_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 6f2194c41f..75132cd5b2 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -253,7 +253,7 @@ func TestGroup_Compact_e2e(t *testing.T) { comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil) testutil.Ok(t, err) - shouldRerun, id, err := g.Compact(ctx, dir, comp) + shouldRerun, id, err := g.Compact(ctx, dir, comp, false) testutil.Ok(t, err) testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") @@ -262,7 +262,7 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, g.Add(m)) } - shouldRerun, id, err = g.Compact(ctx, dir, comp) + shouldRerun, id, err = g.Compact(ctx, dir, comp, false) testutil.Ok(t, err) testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not") From 42e89c4dc1614c30745d501e7cae4c321cf801a4 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 21 Mar 2019 13:29:32 -0400 Subject: [PATCH 4/8] Skip verification of newly compacted block if allow malformed index The VerifyIndex() function explicitly states testing of the invariant ordering, so rather than adding additional parameters to change its behavior when --debug.accept-malformed-index is set, we skip the verification step on the newly compacted TSDB block. This allows compaction to happen as normal when out of order labels are present in the index. --- pkg/compact/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index da5a0470bf..793b6b2107 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -820,7 +820,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor, i } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { + if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !ignoreMalformed && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } From 419c3ef336274884b3f7afbfb48509b4b92d527a Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Fri, 22 Mar 2019 10:51:02 -0400 Subject: [PATCH 5/8] PR feedback for acceptMalformedIndex * Use fields instead of function parameters * use the same variable name everywhere --- cmd/thanos/compact.go | 11 +++++----- pkg/compact/compact.go | 46 ++++++++++++++++++++++++------------------ 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ccc7923231..d3e600f2b4 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -66,7 +66,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected."). Hidden().Default("true").Bool() - acceptMalformed := cmd.Flag("debug.accept-malformed-index", + acceptMalformedIndex := cmd.Flag("debug.accept-malformed-index", "Compaction index verification will ignore out of order label names."). Hidden().Default("false").Bool() @@ -105,7 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig, time.Duration(*syncDelay), *haltOnError, - *acceptMalformed, + *acceptMalformedIndex, *wait, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), @@ -129,7 +129,7 @@ func runCompact( objStoreConfig *pathOrContent, syncDelay time.Duration, haltOnError bool, - acceptMalformed bool, + acceptMalformedIndex bool, wait bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, @@ -197,7 +197,8 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } - compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt) + compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, + acceptMalformedIndex) if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 { level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw]) @@ -211,7 +212,7 @@ func runCompact( ctx, cancel := context.WithCancel(context.Background()) f := func() error { - if err := compactor.Compact(ctx, acceptMalformed); err != nil { + if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction failed") } level.Info(logger).Log("msg", "compaction iterations done") diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 793b6b2107..59806e5483 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -278,7 +278,7 @@ func groupKey(res int64, lbls labels.Labels) string { // Groups returns the compaction groups for all blocks currently known to the syncer. // It creates all groups from the scratch on every call. -func (c *Syncer) Groups() (res []*Group, err error) { +func (c *Syncer) Groups(acceptMalformedIndex bool) (res []*Group, err error) { c.mtx.Lock() defer c.mtx.Unlock() @@ -291,6 +291,7 @@ func (c *Syncer) Groups() (res []*Group, err error) { c.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, + acceptMalformedIndex, c.metrics.compactions.WithLabelValues(GroupKey(*m)), c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)), c.metrics.garbageCollectedBlocks, @@ -436,6 +437,7 @@ type Group struct { resolution int64 mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta + acceptMalformedIndex bool compactions prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter @@ -447,6 +449,7 @@ func newGroup( bkt objstore.Bucket, lset labels.Labels, resolution int64, + acceptMalformedIndex bool, compactions prometheus.Counter, compactionFailures prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, @@ -460,6 +463,7 @@ func newGroup( labels: lset, resolution: resolution, blocks: map[ulid.ULID]*metadata.Meta{}, + acceptMalformedIndex: acceptMalformedIndex, compactions: compactions, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, @@ -513,7 +517,7 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor, ignoreMalformedIndex bool) (bool, ulid.ULID, error) { +func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) { subDir := filepath.Join(dir, cg.Key()) if err := os.RemoveAll(subDir); err != nil { @@ -523,7 +527,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor, i return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - shouldRerun, compID, err := cg.compact(ctx, subDir, comp, ignoreMalformedIndex) + shouldRerun, compID, err := cg.compact(ctx, subDir, comp) if err != nil { cg.compactionFailures.Inc() } @@ -688,7 +692,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor, ignoreMalformed bool) (shouldRerun bool, compID ulid.ULID, err error) { +func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -770,7 +774,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor, i return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } - if err := stats.PrometheusIssue5372Err(); !ignoreMalformed && err != nil { + if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "block id %s", id) } } @@ -820,7 +824,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor, i } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !ignoreMalformed && err != nil { + if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } @@ -871,26 +875,28 @@ func (cg *Group) deleteBlock(b string) error { // BucketCompactor compacts blocks in a bucket. type BucketCompactor struct { - logger log.Logger - sy *Syncer - comp tsdb.Compactor - compactDir string - bkt objstore.Bucket + logger log.Logger + sy *Syncer + comp tsdb.Compactor + compactDir string + bkt objstore.Bucket + acceptMalformedIndex bool } // NewBucketCompactor creates a new bucket compactor. -func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket) *BucketCompactor { +func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, acceptMalformedIndex bool) *BucketCompactor { return &BucketCompactor{ - logger: logger, - sy: sy, - comp: comp, - compactDir: compactDir, - bkt: bkt, + logger: logger, + sy: sy, + comp: comp, + compactDir: compactDir, + bkt: bkt, + acceptMalformedIndex: acceptMalformedIndex, } } // Compact runs compaction over bucket. -func (c *BucketCompactor) Compact(ctx context.Context, IgnoreMalformedIndex bool) error { +func (c *BucketCompactor) Compact(ctx context.Context) error { // Loop over bucket and compact until there's no work left. for { // Clean up the compaction temporary directory at the beginning of every compaction loop. @@ -912,13 +918,13 @@ func (c *BucketCompactor) Compact(ctx context.Context, IgnoreMalformedIndex bool level.Info(c.logger).Log("msg", "start of compaction") - groups, err := c.sy.Groups() + groups, err := c.sy.Groups(c.acceptMalformedIndex) if err != nil { return errors.Wrap(err, "build compaction groups") } finishedAllGroups := true for _, g := range groups { - shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp, IgnoreMalformedIndex) + shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) if err == nil { if shouldRerunGroup { finishedAllGroups = false From 022ff2bec8ea53f3b5c22b993102c976ad7bc569 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Fri, 22 Mar 2019 11:08:02 -0400 Subject: [PATCH 6/8] acceptMalformedIndex: Fix tests reflecting field vs parameters --- pkg/compact/compact_e2e_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 75132cd5b2..debec4b490 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -62,13 +62,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } - groups, err := sy.Groups() + groups, err := sy.Groups(false) testutil.Ok(t, err) testutil.Equals(t, ids[:10], groups[0].IDs()) testutil.Ok(t, sy.SyncMetas(ctx)) - groups, err = sy.Groups() + groups, err = sy.Groups(false) testutil.Ok(t, err) testutil.Equals(t, ids[5:], groups[0].IDs()) }) @@ -157,7 +157,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.SyncMetas(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - groups, err := sy.Groups() + groups, err := sy.Groups(false) testutil.Ok(t, err) testutil.Equals(t, "0@{}", groups[0].Key()) @@ -244,6 +244,7 @@ func TestGroup_Compact_e2e(t *testing.T) { bkt, extLset, 124, + false, metrics.compactions.WithLabelValues(""), metrics.compactionFailures.WithLabelValues(""), metrics.garbageCollectedBlocks, @@ -253,7 +254,7 @@ func TestGroup_Compact_e2e(t *testing.T) { comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil) testutil.Ok(t, err) - shouldRerun, id, err := g.Compact(ctx, dir, comp, false) + shouldRerun, id, err := g.Compact(ctx, dir, comp) testutil.Ok(t, err) testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") @@ -262,7 +263,7 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, g.Add(m)) } - shouldRerun, id, err = g.Compact(ctx, dir, comp, false) + shouldRerun, id, err = g.Compact(ctx, dir, comp) testutil.Ok(t, err) testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not") From 8a2eae53cb984b94e4ac28e8b969fd02ea8b2b8f Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Fri, 22 Mar 2019 13:30:06 -0400 Subject: [PATCH 7/8] Route acceptMalformedIndex option via the Syncer --- cmd/thanos/compact.go | 6 +++--- pkg/compact/compact.go | 34 ++++++++++++++++----------------- pkg/compact/compact_e2e_test.go | 10 +++++----- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index d3e600f2b4..c95cf113d6 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -167,7 +167,8 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency) + sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, + blockSyncConcurrency, acceptMalformedIndex) if err != nil { return errors.Wrap(err, "create syncer") } @@ -197,8 +198,7 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } - compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, - acceptMalformedIndex) + compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt) if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 { level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw]) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 59806e5483..aa9f5e8e46 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -47,6 +47,7 @@ type Syncer struct { blocksMtx sync.Mutex blockSyncConcurrency int metrics *syncerMetrics + acceptMalformedIndex bool } type syncerMetrics struct { @@ -128,7 +129,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -140,6 +141,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket bkt: bkt, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, + acceptMalformedIndex: acceptMalformedIndex, }, nil } @@ -278,7 +280,7 @@ func groupKey(res int64, lbls labels.Labels) string { // Groups returns the compaction groups for all blocks currently known to the syncer. // It creates all groups from the scratch on every call. -func (c *Syncer) Groups(acceptMalformedIndex bool) (res []*Group, err error) { +func (c *Syncer) Groups() (res []*Group, err error) { c.mtx.Lock() defer c.mtx.Unlock() @@ -291,7 +293,7 @@ func (c *Syncer) Groups(acceptMalformedIndex bool) (res []*Group, err error) { c.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, - acceptMalformedIndex, + c.acceptMalformedIndex, c.metrics.compactions.WithLabelValues(GroupKey(*m)), c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)), c.metrics.garbageCollectedBlocks, @@ -875,23 +877,21 @@ func (cg *Group) deleteBlock(b string) error { // BucketCompactor compacts blocks in a bucket. type BucketCompactor struct { - logger log.Logger - sy *Syncer - comp tsdb.Compactor - compactDir string - bkt objstore.Bucket - acceptMalformedIndex bool + logger log.Logger + sy *Syncer + comp tsdb.Compactor + compactDir string + bkt objstore.Bucket } // NewBucketCompactor creates a new bucket compactor. -func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, acceptMalformedIndex bool) *BucketCompactor { +func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket) *BucketCompactor { return &BucketCompactor{ - logger: logger, - sy: sy, - comp: comp, - compactDir: compactDir, - bkt: bkt, - acceptMalformedIndex: acceptMalformedIndex, + logger: logger, + sy: sy, + comp: comp, + compactDir: compactDir, + bkt: bkt, } } @@ -918,7 +918,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { level.Info(c.logger).Log("msg", "start of compaction") - groups, err := c.sy.Groups(c.acceptMalformedIndex) + groups, err := c.sy.Groups() if err != nil { return errors.Wrap(err, "build compaction groups") } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index debec4b490..738048c5c2 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) testutil.Ok(t, err) // Generate 15 blocks. Initially the first 10 are synced into memory and only the last @@ -62,13 +62,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } - groups, err := sy.Groups(false) + groups, err := sy.Groups() testutil.Ok(t, err) testutil.Equals(t, ids[:10], groups[0].IDs()) testutil.Ok(t, sy.SyncMetas(ctx)) - groups, err = sy.Groups(false) + groups, err = sy.Groups() testutil.Ok(t, err) testutil.Equals(t, ids[5:], groups[0].IDs()) }) @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1) + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) testutil.Ok(t, err) testutil.Ok(t, sy.SyncMetas(ctx)) @@ -157,7 +157,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.SyncMetas(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - groups, err := sy.Groups(false) + groups, err := sy.Groups() testutil.Ok(t, err) testutil.Equals(t, "0@{}", groups[0].Key()) From 662dbc398e47194117aa19623048cc7188a3a29e Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 25 Mar 2019 10:59:33 -0400 Subject: [PATCH 8/8] accept-malformed-index: PR feedback for comments and error msgs --- pkg/block/index.go | 6 +++--- pkg/compact/compact.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index 8bccf1f800..777654f2e6 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -254,9 +254,9 @@ type Stats struct { OutOfOrderLabels int } -// PrometheusIssue5372Err returns an error if statsd indicates postings with out -// of order labels. This is introduced by Prometheus Issue #5372 in 2.8.0 and -// below. +// PrometheusIssue5372Err returns an error if the Stats object indicates +// postings with out of order labels. This is corrected by Prometheus Issue +// #5372 and affects Prometheus versions 2.8.0 and below. func (i Stats) PrometheusIssue5372Err() error { if i.OutOfOrderLabels > 0 { return errors.Errorf("index contains %d postings with out of order labels", diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index aa9f5e8e46..6fd89b76e9 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -777,7 +777,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "block id %s", id) + return false, ulid.ULID{}, errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", id) } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks",