-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compact: accept malformed index #953
Changes from 6 commits
e7aa8ee
f4b07c8
6011c72
42e89c4
419c3ef
022ff2b
8a2eae5
662dbc3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ import ( | |
"github.com/prometheus/tsdb/fileutil" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"github.com/improbable-eng/thanos/pkg/runutil" | ||
"github.com/oklog/ulid" | ||
"github.com/pkg/errors" | ||
|
@@ -248,6 +249,20 @@ type Stats struct { | |
// Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are | ||
// are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks. | ||
Issue347OutsideChunks int | ||
// OutOfOrderLabels represents the number of postings that contained out | ||
// of order labels, a bug present in Prometheus 2.8.0 and below. | ||
OutOfOrderLabels int | ||
} | ||
|
||
// PrometheusIssue5372Err returns an error if statsd indicates postings with out | ||
// of order labels. This is introduced by Prometheus Issue #5372 in 2.8.0 and | ||
// below. | ||
func (i Stats) PrometheusIssue5372Err() error { | ||
if i.OutOfOrderLabels > 0 { | ||
return errors.Errorf("index contains %d postings with out of order labels", | ||
i.OutOfOrderLabels) | ||
} | ||
return nil | ||
} | ||
|
||
// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block). | ||
|
@@ -301,6 +316,10 @@ func (i Stats) AnyErr() error { | |
errMsg = append(errMsg, err.Error()) | ||
} | ||
|
||
if err := i.PrometheusIssue5372Err(); err != nil { | ||
errMsg = append(errMsg, err.Error()) | ||
} | ||
|
||
if len(errMsg) > 0 { | ||
return errors.New(strings.Join(errMsg, ", ")) | ||
} | ||
|
@@ -348,7 +367,12 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime | |
l0 := lset[0] | ||
for _, l := range lset[1:] { | ||
if l.Name < l0.Name { | ||
return stats, errors.Errorf("out-of-order label set %s for series %d", lset, id) | ||
stats.OutOfOrderLabels++ | ||
level.Warn(logger).Log("msg", | ||
"out-of-order label set: known bug in Prometheus 2.8.0 and below", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we should suggest users here to enable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I addressed this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, that's even better 👍 |
||
"labelset", fmt.Sprintf("%s", lset), | ||
"series", fmt.Sprintf("%d", id), | ||
) | ||
} | ||
l0 = l | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -278,7 +278,7 @@ func groupKey(res int64, lbls labels.Labels) string { | |
|
||
// Groups returns the compaction groups for all blocks currently known to the syncer. | ||
// It creates all groups from the scratch on every call. | ||
func (c *Syncer) Groups() (res []*Group, err error) { | ||
func (c *Syncer) Groups(acceptMalformedIndex bool) (res []*Group, err error) { | ||
c.mtx.Lock() | ||
defer c.mtx.Unlock() | ||
|
||
|
@@ -291,6 +291,7 @@ func (c *Syncer) Groups() (res []*Group, err error) { | |
c.bkt, | ||
labels.FromMap(m.Thanos.Labels), | ||
m.Thanos.Downsample.Resolution, | ||
acceptMalformedIndex, | ||
c.metrics.compactions.WithLabelValues(GroupKey(*m)), | ||
c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)), | ||
c.metrics.garbageCollectedBlocks, | ||
|
@@ -436,6 +437,7 @@ type Group struct { | |
resolution int64 | ||
mtx sync.Mutex | ||
blocks map[ulid.ULID]*metadata.Meta | ||
acceptMalformedIndex bool | ||
compactions prometheus.Counter | ||
compactionFailures prometheus.Counter | ||
groupGarbageCollectedBlocks prometheus.Counter | ||
|
@@ -447,6 +449,7 @@ func newGroup( | |
bkt objstore.Bucket, | ||
lset labels.Labels, | ||
resolution int64, | ||
acceptMalformedIndex bool, | ||
compactions prometheus.Counter, | ||
compactionFailures prometheus.Counter, | ||
groupGarbageCollectedBlocks prometheus.Counter, | ||
|
@@ -460,6 +463,7 @@ func newGroup( | |
labels: lset, | ||
resolution: resolution, | ||
blocks: map[ulid.ULID]*metadata.Meta{}, | ||
acceptMalformedIndex: acceptMalformedIndex, | ||
compactions: compactions, | ||
compactionFailures: compactionFailures, | ||
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, | ||
|
@@ -769,6 +773,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( | |
if err := stats.Issue347OutsideChunksErr(); err != nil { | ||
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) | ||
} | ||
|
||
if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { | ||
return false, ulid.ULID{}, errors.Wrapf(err, "block id %s", id) | ||
} | ||
} | ||
level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", | ||
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) | ||
|
@@ -816,7 +824,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( | |
} | ||
|
||
// Ensure the output block is valid. | ||
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { | ||
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { | ||
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) | ||
} | ||
|
||
|
@@ -867,21 +875,23 @@ func (cg *Group) deleteBlock(b string) error { | |
|
||
// BucketCompactor compacts blocks in a bucket. | ||
type BucketCompactor struct { | ||
logger log.Logger | ||
sy *Syncer | ||
comp tsdb.Compactor | ||
compactDir string | ||
bkt objstore.Bucket | ||
logger log.Logger | ||
sy *Syncer | ||
comp tsdb.Compactor | ||
compactDir string | ||
bkt objstore.Bucket | ||
acceptMalformedIndex bool | ||
} | ||
|
||
// NewBucketCompactor creates a new bucket compactor. | ||
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket) *BucketCompactor { | ||
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, acceptMalformedIndex bool) *BucketCompactor { | ||
return &BucketCompactor{ | ||
logger: logger, | ||
sy: sy, | ||
comp: comp, | ||
compactDir: compactDir, | ||
bkt: bkt, | ||
logger: logger, | ||
sy: sy, | ||
comp: comp, | ||
compactDir: compactDir, | ||
bkt: bkt, | ||
acceptMalformedIndex: acceptMalformedIndex, | ||
} | ||
} | ||
|
||
|
@@ -908,7 +918,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { | |
|
||
level.Info(c.logger).Log("msg", "start of compaction") | ||
|
||
groups, err := c.sy.Groups() | ||
groups, err := c.sy.Groups(c.acceptMalformedIndex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we put to group field as well? Essentially if it is option like this then it suggests that we change this dynamically, which we don't. Also bool argument in function is discouraged ): There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at that this morning, which would mean I need to modify the Syncer to support this flag rather than BucketCompactor. We'd pass in the command line option here: https://github.com/jjneely/thanos/blob/jjneely/accept-malformed-index/cmd/thanos/compact.go#L170 rather than here: https://github.com/jjneely/thanos/blob/jjneely/accept-malformed-index/cmd/thanos/compact.go#L200 That seemed like not a straight forward path. Then I found that compaction Groups were created by the Syncer rather than the BlockCompactor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, changed. |
||
if err != nil { | ||
return errors.Wrap(err, "build compaction groups") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wording seems a bit off here. Perhaps:
statsd indicates
->stats indicate
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in the most recent patch