Skip to content

Commit

Permalink
compact: accept malformed index (#953)
Browse files Browse the repository at this point in the history
* Allow malformed index patch

Handle cases where we detect that postings have labels listed
incorrectly due to Prometheus Isuue #5372.  With a command line option
set these specific errors can be ignored as they happen with Prometheus
2.8.0 and lesser versions.

* Fix space in structured logging key

* Fix tests to not ignore out of order label errors in postings

* Skip verification of newly compacted block if allow malformed index

The VerifyIndex() function explicitly states testing of the invariant
ordering, so rather than adding additional parameters to change its
behavior when --debug.accept-malformed-index is set, we skip the
verification step on the newly compacted TSDB block.  This allows
compaction to happen as normal when out of order labels are present in
the index.

* PR feedback for acceptMalformedIndex

* Use fields instead of function parameters
* use the same variable name everywhere

* acceptMalformedIndex: Fix tests reflecting field vs parameters

* Route acceptMalformedIndex option via the Syncer

* accept-malformed-index: PR feedback for comments and error msgs
  • Loading branch information
jjneely authored and GiedriusS committed Mar 25, 2019
1 parent f251948 commit c75388a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 6 deletions.
8 changes: 7 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
acceptMalformedIndex := cmd.Flag("debug.accept-malformed-index",
"Compaction index verification will ignore out of order label names.").
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)

Expand Down Expand Up @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
time.Duration(*syncDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
Expand All @@ -125,6 +129,7 @@ func runCompact(
objStoreConfig *pathOrContent,
syncDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
Expand Down Expand Up @@ -162,7 +167,8 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency)
sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay,
blockSyncConcurrency, acceptMalformedIndex)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/tsdb/fileutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -248,6 +249,20 @@ type Stats struct {
// Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are
// are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks.
Issue347OutsideChunks int
// OutOfOrderLabels represents the number of postings that contained out
// of order labels, a bug present in Prometheus 2.8.0 and below.
OutOfOrderLabels int
}

// PrometheusIssue5372Err returns an error if the Stats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i Stats) PrometheusIssue5372Err() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
}
return nil
}

// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block).
Expand Down Expand Up @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down Expand Up @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
l0 := lset[0]
for _, l := range lset[1:] {
if l.Name < l0.Name {
return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id)
stats.OutOfOrderLabels++
level.Warn(logger).Log("msg",
"out-of-order label set: known bug in Prometheus 2.8.0 and below",
"labelset", fmt.Sprintf("%s", lset),
"series", fmt.Sprintf("%d", id),
)
}
l0 = l
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Syncer struct {
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
}

type syncerMetrics struct {
Expand Down Expand Up @@ -128,7 +129,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -140,6 +141,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
bkt: bkt,
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
}, nil
}

Expand Down Expand Up @@ -291,6 +293,7 @@ func (c *Syncer) Groups() (res []*Group, err error) {
c.bkt,
labels.FromMap(m.Thanos.Labels),
m.Thanos.Downsample.Resolution,
c.acceptMalformedIndex,
c.metrics.compactions.WithLabelValues(GroupKey(*m)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)),
c.metrics.garbageCollectedBlocks,
Expand Down Expand Up @@ -436,6 +439,7 @@ type Group struct {
resolution int64
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
acceptMalformedIndex bool
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
Expand All @@ -447,6 +451,7 @@ func newGroup(
bkt objstore.Bucket,
lset labels.Labels,
resolution int64,
acceptMalformedIndex bool,
compactions prometheus.Counter,
compactionFailures prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
Expand All @@ -460,6 +465,7 @@ func newGroup(
labels: lset,
resolution: resolution,
blocks: map[ulid.ULID]*metadata.Meta{},
acceptMalformedIndex: acceptMalformedIndex,
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
Expand Down Expand Up @@ -769,6 +775,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", id)
}
}
level.Debug(cg.logger).Log("msg", "downloaded and verified blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))
Expand Down Expand Up @@ -816,7 +827,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil {
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -244,6 +244,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
bkt,
extLset,
124,
false,
metrics.compactions.WithLabelValues(""),
metrics.compactionFailures.WithLabelValues(""),
metrics.garbageCollectedBlocks,
Expand Down

0 comments on commit c75388a

Please sign in to comment.