Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: accept malformed index #953

Merged
merged 8 commits into from
Mar 25, 2019
8 changes: 7 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
acceptMalformedIndex := cmd.Flag("debug.accept-malformed-index",
"Compaction index verification will ignore out of order label names.").
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)

Expand Down Expand Up @@ -102,6 +105,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
time.Duration(*syncDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
Expand All @@ -125,6 +129,7 @@ func runCompact(
objStoreConfig *pathOrContent,
syncDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
Expand Down Expand Up @@ -192,7 +197,8 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt)
compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt,
acceptMalformedIndex)

if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw])
Expand Down
26 changes: 25 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/tsdb/fileutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -248,6 +249,20 @@ type Stats struct {
// Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are
// are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks.
Issue347OutsideChunks int
// OutOfOrderLabels represents the number of postings that contained out
// of order labels, a bug present in Prometheus 2.8.0 and below.
OutOfOrderLabels int
}

// PrometheusIssue5372Err returns an error if statsd indicates postings with out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording seems a bit off here. Perhaps: statsd indicates -> stats indicate ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the most recent patch

// of order labels. This is introduced by Prometheus Issue #5372 in 2.8.0 and
// below.
func (i Stats) PrometheusIssue5372Err() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
}
return nil
}

// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block).
Expand Down Expand Up @@ -301,6 +316,10 @@ func (i Stats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down Expand Up @@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
l0 := lset[0]
for _, l := range lset[1:] {
if l.Name < l0.Name {
return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id)
stats.OutOfOrderLabels++
level.Warn(logger).Log("msg",
"out-of-order label set: known bug in Prometheus 2.8.0 and below",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should suggest users here to enable --debug.accept-malformed-index here? I mean we would only tell users about a known bug now but wouldn't tell them how to fix it so that's weird 😄 we could be more user-friendly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this in pkg/compact/compact.go where the error is generated that would otherwise halt execution of the compact component. Rather than here were it just becomes noise when the CLI option is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, that's even better 👍

"labelset", fmt.Sprintf("%s", lset),
"series", fmt.Sprintf("%d", id),
)
}
l0 = l
}
Expand Down
38 changes: 24 additions & 14 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func groupKey(res int64, lbls labels.Labels) string {

// Groups returns the compaction groups for all blocks currently known to the syncer.
// It creates all groups from the scratch on every call.
func (c *Syncer) Groups() (res []*Group, err error) {
func (c *Syncer) Groups(acceptMalformedIndex bool) (res []*Group, err error) {
c.mtx.Lock()
defer c.mtx.Unlock()

Expand All @@ -291,6 +291,7 @@ func (c *Syncer) Groups() (res []*Group, err error) {
c.bkt,
labels.FromMap(m.Thanos.Labels),
m.Thanos.Downsample.Resolution,
acceptMalformedIndex,
c.metrics.compactions.WithLabelValues(GroupKey(*m)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)),
c.metrics.garbageCollectedBlocks,
Expand Down Expand Up @@ -436,6 +437,7 @@ type Group struct {
resolution int64
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
acceptMalformedIndex bool
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
Expand All @@ -447,6 +449,7 @@ func newGroup(
bkt objstore.Bucket,
lset labels.Labels,
resolution int64,
acceptMalformedIndex bool,
compactions prometheus.Counter,
compactionFailures prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
Expand All @@ -460,6 +463,7 @@ func newGroup(
labels: lset,
resolution: resolution,
blocks: map[ulid.ULID]*metadata.Meta{},
acceptMalformedIndex: acceptMalformedIndex,
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
Expand Down Expand Up @@ -769,6 +773,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "block id %s", id)
}
}
level.Debug(cg.logger).Log("msg", "downloaded and verified blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))
Expand Down Expand Up @@ -816,7 +824,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil {
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand Down Expand Up @@ -867,21 +875,23 @@ func (cg *Group) deleteBlock(b string) error {

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
sy *Syncer
comp tsdb.Compactor
compactDir string
bkt objstore.Bucket
logger log.Logger
sy *Syncer
comp tsdb.Compactor
compactDir string
bkt objstore.Bucket
acceptMalformedIndex bool
}

// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket) *BucketCompactor {
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, acceptMalformedIndex bool) *BucketCompactor {
return &BucketCompactor{
logger: logger,
sy: sy,
comp: comp,
compactDir: compactDir,
bkt: bkt,
logger: logger,
sy: sy,
comp: comp,
compactDir: compactDir,
bkt: bkt,
acceptMalformedIndex: acceptMalformedIndex,
}
}

Expand All @@ -908,7 +918,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {

level.Info(c.logger).Log("msg", "start of compaction")

groups, err := c.sy.Groups()
groups, err := c.sy.Groups(c.acceptMalformedIndex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put to group field as well? Essentially if it is option like this then it suggests that we change this dynamically, which we don't. Also bool argument in function is discouraged ):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at that this morning, which would mean I need to modify the Syncer to support this flag rather than BucketCompactor. We'd pass in the command line option here:

https://github.com/jjneely/thanos/blob/jjneely/accept-malformed-index/cmd/thanos/compact.go#L170

rather than here:

https://github.com/jjneely/thanos/blob/jjneely/accept-malformed-index/cmd/thanos/compact.go#L200

That seemed like not a straight forward path. Then I found that compaction Groups were created by the Syncer rather than the BlockCompactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, changed.

if err != nil {
return errors.Wrap(err, "build compaction groups")
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}

groups, err := sy.Groups()
groups, err := sy.Groups(false)
testutil.Ok(t, err)
testutil.Equals(t, ids[:10], groups[0].IDs())

testutil.Ok(t, sy.SyncMetas(ctx))

groups, err = sy.Groups()
groups, err = sy.Groups(false)
testutil.Ok(t, err)
testutil.Equals(t, ids[5:], groups[0].IDs())
})
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.SyncMetas(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
groups, err := sy.Groups()
groups, err := sy.Groups(false)
testutil.Ok(t, err)

testutil.Equals(t, "0@{}", groups[0].Key())
Expand Down Expand Up @@ -244,6 +244,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
bkt,
extLset,
124,
false,
metrics.compactions.WithLabelValues(""),
metrics.compactionFailures.WithLabelValues(""),
metrics.garbageCollectedBlocks,
Expand Down