Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: accept malformed index #953

Merged
merged 8 commits into from
Mar 25, 2019
7 changes: 6 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
acceptMalformed := cmd.Flag("debug.accept-malformed-index",
"Compaction index verification will ignore out of order label names.").
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)

Expand Down Expand Up @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
time.Duration(*syncDelay),
*haltOnError,
*acceptMalformed,
*wait,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
Expand All @@ -125,6 +129,7 @@ func runCompact(
objStoreConfig *pathOrContent,
syncDelay time.Duration,
haltOnError bool,
acceptMalformed bool,
wait bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
Expand Down Expand Up @@ -206,7 +211,7 @@ func runCompact(

ctx, cancel := context.WithCancel(context.Background())
f := func() error {
if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(ctx, acceptMalformed); err != nil {
return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")
Expand Down
26 changes: 25 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/tsdb/fileutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -248,6 +249,20 @@ type Stats struct {
// Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are
// are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks.
Issue347OutsideChunks int
// OutOfOrderLabels represents the number of postings that contained out
// of order labels, a bug present in Prometheus 2.8.0 and below.
OutOfOrderLabels int
}

// PrometheusIssue5372Err returns an error if statsd indicates postings with out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording seems a bit off here. Perhaps: statsd indicates -> stats indicate ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the most recent patch

// of order labels. This is introduced by Prometheus Issue #5372 in 2.8.0 and
// below.
func (i Stats) PrometheusIssue5372Err() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
}
return nil
}

// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block).
Expand Down Expand Up @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down Expand Up @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
l0 := lset[0]
for _, l := range lset[1:] {
if l.Name < l0.Name {
return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id)
stats.OutOfOrderLabels++
level.Warn(logger).Log("msg",
"out-of-order label set: known bug in Prometheus 2.8.0 and below",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should suggest users here to enable --debug.accept-malformed-index here? I mean we would only tell users about a known bug now but wouldn't tell them how to fix it so that's weird 😄 we could be more user-friendly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this in pkg/compact/compact.go where the error is generated that would otherwise halt execution of the compact component. Rather than here were it just becomes noise when the CLI option is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, that's even better 👍

"labelset", fmt.Sprintf("%s", lset),
"series", fmt.Sprintf("%d", id),
)
}
l0 = l
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (cg *Group) Resolution() int64 {

// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) {
func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor, ignoreMalformedIndex bool) (bool, ulid.ULID, error) {
jjneely marked this conversation as resolved.
Show resolved Hide resolved
subDir := filepath.Join(dir, cg.Key())

if err := os.RemoveAll(subDir); err != nil {
Expand All @@ -523,7 +523,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}

shouldRerun, compID, err := cg.compact(ctx, subDir, comp)
shouldRerun, compID, err := cg.compact(ctx, subDir, comp, ignoreMalformedIndex)
if err != nil {
cg.compactionFailures.Inc()
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) {
func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor, ignoreMalformed bool) (shouldRerun bool, compID ulid.ULID, err error) {
jjneely marked this conversation as resolved.
Show resolved Hide resolved
cg.mtx.Lock()
defer cg.mtx.Unlock()

Expand Down Expand Up @@ -769,6 +769,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !ignoreMalformed && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "block id %s", id)
}
}
level.Debug(cg.logger).Log("msg", "downloaded and verified blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))
Expand Down Expand Up @@ -816,7 +820,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil {
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !ignoreMalformed && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand Down Expand Up @@ -886,7 +890,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
func (c *BucketCompactor) Compact(ctx context.Context, IgnoreMalformedIndex bool) error {
jjneely marked this conversation as resolved.
Show resolved Hide resolved
// Loop over bucket and compact until there's no work left.
for {
// Clean up the compaction temporary directory at the beginning of every compaction loop.
Expand Down Expand Up @@ -914,7 +918,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
}
finishedAllGroups := true
for _, g := range groups {
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp, IgnoreMalformedIndex)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil)
testutil.Ok(t, err)

shouldRerun, id, err := g.Compact(ctx, dir, comp)
shouldRerun, id, err := g.Compact(ctx, dir, comp, false)
testutil.Ok(t, err)
testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun")

Expand All @@ -262,7 +262,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
testutil.Ok(t, g.Add(m))
}

shouldRerun, id, err = g.Compact(ctx, dir, comp)
shouldRerun, id, err = g.Compact(ctx, dir, comp, false)
testutil.Ok(t, err)
testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not")

Expand Down