From f0dfe27bf4c31cc273f10e2e4cfee83277dbf80b Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Fri, 22 Mar 2019 16:19:06 -0400 Subject: [PATCH 1/9] bucket verify: repair out of order labels --- cmd/thanos/bucket.go | 5 +++-- pkg/block/index.go | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index e7a1c22040..29764d4555 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -85,10 +85,11 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name var backupBkt objstore.Bucket if len(backupconfContentYaml) == 0 { if *repair { - return errors.Wrap(err, "repair is specified, so backup client is required") + return fmt.Errorf("repair is specified, so backup client is required") } } else { - backupBkt, err = client.NewBucket(logger, backupconfContentYaml, reg, name) + // nil Prometheus registerer: don't create conflicting metrics + backupBkt, err = client.NewBucket(logger, backupconfContentYaml, nil, name) if err != nil { return err } diff --git a/pkg/block/index.go b/pkg/block/index.go index 777654f2e6..65d92e22a7 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -616,6 +616,10 @@ func rewrite( if err := indexr.Series(id, &lset, &chks); err != nil { return err } + // Make sure labels are in sorted order + sort.Slice(lset, func(i, j int) bool { + return lset[i].Name < lset[j].Name + }) for i, c := range chks { chks[i].Chunk, err = chunkr.Chunk(c.Ref) if err != nil { From f09d3f52f734c92a2e718125c47c4bff12022029 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 25 Mar 2019 16:57:09 -0400 Subject: [PATCH 2/9] verify repair: correctly order series in the index on rewrite When we have label sets that are not in the correct order, fixing that changes the order of the series in the index. So the index must be rewritten in that new order. This makes this repair tool take up a bunch more memory, but produces blocks that verify correctly. --- pkg/block/index.go | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index 65d92e22a7..3e626fe7c2 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -578,6 +578,11 @@ OUTER: return repl, nil } +type seriesRepair struct { + lset labels.Labels + chks []chunks.Meta +} + // rewrite writes all data from the readers back into the writers while cleaning // up mis-ordered and duplicated chunks. func rewrite( @@ -605,12 +610,12 @@ func rewrite( postings = index.NewMemPostings() values = map[string]stringset{} i = uint64(0) + series = []seriesRepair{} ) - var lset labels.Labels - var chks []chunks.Meta - for all.Next() { + var lset labels.Labels + var chks []chunks.Meta id := all.At() if err := indexr.Series(id, &lset, &chks); err != nil { @@ -636,21 +641,38 @@ func rewrite( continue } - if err := chunkw.WriteChunks(chks...); err != nil { + series = append(series, seriesRepair{ + lset: lset, + chks: chks, + }) + } + + if all.Err() != nil { + return errors.Wrap(all.Err(), "iterate series") + } + + // sort the series -- if labels moved around the ordering will be different + sort.Slice(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 + }) + + // build new TSDB block + for _, s := range series { + if err := chunkw.WriteChunks(s.chks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(i, lset, chks...); err != nil { + if err := indexw.AddSeries(i, s.lset, s.chks...); err != nil { return errors.Wrap(err, "add series") } - meta.Stats.NumChunks += uint64(len(chks)) + meta.Stats.NumChunks += uint64(len(s.chks)) meta.Stats.NumSeries++ - for _, chk := range chks { + for _, chk := range s.chks { meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } - for _, l := range lset { + for _, l := range s.lset { valset, ok := values[l.Name] if !ok { valset = stringset{} @@ -658,12 +680,9 @@ func rewrite( } valset.set(l.Value) } - postings.Add(i, lset) + postings.Add(i, s.lset) i++ } - if all.Err() != nil { - return errors.Wrap(all.Err(), "iterate series") - } s := make([]string, 0, 256) for n, v := range values { From 230bc7bdb5a2f273aef24808ce96e320b784e458 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 28 Mar 2019 17:12:41 -0400 Subject: [PATCH 3/9] Fix the TSDB block safe-delete function The directory name must be the block ID name exactly to verify. A temp directory or random name will not work here. --- pkg/verifier/overlapped_blocks.go | 3 ++- pkg/verifier/safe_delete.go | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 072fe54aec..af28fccb0c 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -2,6 +2,8 @@ package verifier import ( "context" + "sort" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -10,7 +12,6 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb" - "sort" ) const OverlappedBlocksIssueID = "overlapped_blocks" diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index 94383e07e2..cfea10c0df 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -2,9 +2,9 @@ package verifier import ( "context" - "fmt" "io/ioutil" "os" + "path/filepath" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -31,13 +31,18 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id) } - dir, err := ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id)) + tempdir, err := ioutil.TempDir("", "safe-delete") + if err != nil { + return err + } + dir := filepath.Join(tempdir, id.String()) + err = os.Mkdir(dir, 0755) if err != nil { return err } defer func() { - if err := os.RemoveAll(dir); err != nil { - level.Warn(logger).Log("msg", "failed to delete dir", "dir", dir, "err", err) + if err := os.RemoveAll(tempdir); err != nil { + level.Warn(logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err) } }() From 3f0d6cd630420fda66f1ed534d90ae54cf940541 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 1 Apr 2019 12:45:07 -0400 Subject: [PATCH 4/9] verify repair: fix duplicate chunk detection Pointer/reference logic error was eliminating all chunks for a series in a given TSDB block that wasn't the first chunk. Chunks are now referenced correctly via pointers. --- pkg/block/index.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index 3e626fe7c2..d192781fa2 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -531,7 +531,7 @@ func IgnoreDuplicateOutsideChunk(_ int64, _ int64, last *chunks.Meta, curr *chun // the current one. if curr.MinTime != last.MinTime || curr.MaxTime != last.MaxTime { return false, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]", - last.MaxTime, last.MaxTime, curr.MinTime, curr.MaxTime) + last.MinTime, last.MaxTime, curr.MinTime, curr.MaxTime) } ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli) cb := crc32.Checksum(curr.Chunk.Bytes(), castagnoli) @@ -559,9 +559,9 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk var last *chunks.Meta OUTER: - for _, c := range chks { + for i := range chks { for _, ignoreChkFn := range ignoreChkFns { - ignore, err := ignoreChkFn(mint, maxt, last, &c) + ignore, err := ignoreChkFn(mint, maxt, last, &chks[i]) if err != nil { return nil, errors.Wrap(err, "ignore function") } @@ -571,8 +571,8 @@ OUTER: } } - last = &c - repl = append(repl, c) + last = &chks[i] + repl = append(repl, chks[i]) } return repl, nil From ed78bded76cd0b7c5e5d17afa48541b1fa9b99ee Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Fri, 12 Apr 2019 11:40:21 -0400 Subject: [PATCH 5/9] PR feedback: use errors.Errorf() instead of fmt.Errorf() --- cmd/thanos/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 29764d4555..4fb6218751 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -85,7 +85,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name var backupBkt objstore.Bucket if len(backupconfContentYaml) == 0 { if *repair { - return fmt.Errorf("repair is specified, so backup client is required") + return errors.Errorf("repair is specified, so backup client is required") } } else { // nil Prometheus registerer: don't create conflicting metrics From 2f179ffc4f98b68ce06c21ae22c55434a6c71e9d Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 15 Apr 2019 10:15:28 -0400 Subject: [PATCH 6/9] Use errors.New() Some linters catch errors.Errorf() as its not really part of the errors package. --- cmd/thanos/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 4fb6218751..2da91cd5e7 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -85,7 +85,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name var backupBkt objstore.Bucket if len(backupconfContentYaml) == 0 { if *repair { - return errors.Errorf("repair is specified, so backup client is required") + return errors.New("repair is specified, so backup client is required") } } else { // nil Prometheus registerer: don't create conflicting metrics From 24173d373f851c5cd195f6aa0b20483353c88180 Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Mon, 15 Apr 2019 16:44:16 -0400 Subject: [PATCH 7/9] Liberally comment this for loop We're comparing items by pointers, using Go's range variables is misleading here and we need not fall into the same trap. --- pkg/block/index.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/block/index.go b/pkg/block/index.go index d192781fa2..be6cf60fa3 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -559,6 +559,11 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk var last *chunks.Meta OUTER: + // This compares the current chunk to the chunk from the last iteration + // by pointers. If we use "i, c := range cks" the variable c is a new + // variable who's address doesn't change through the entire loop. + // The current element of the chks slice is copied into it. We must take + // the address of the indexed slice instead. for i := range chks { for _, ignoreChkFn := range ignoreChkFns { ignore, err := ignoreChkFn(mint, maxt, last, &chks[i]) From 0d39253243b15ab423c01b5b96fabaf1d1770b8e Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Wed, 17 Apr 2019 09:45:57 -0400 Subject: [PATCH 8/9] Take advantage of sort.Interface This prevents us from having to re-implement label sorting. --- pkg/block/index.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index be6cf60fa3..53f9a90097 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -627,9 +627,8 @@ func rewrite( return err } // Make sure labels are in sorted order - sort.Slice(lset, func(i, j int) bool { - return lset[i].Name < lset[j].Name - }) + sort.Sort(lset) + for i, c := range chks { chks[i].Chunk, err = chunkr.Chunk(c.Ref) if err != nil { From bfd44f1068789a956ee0fc0bf77cbde818a5ed0c Mon Sep 17 00:00:00 2001 From: Jack Neely Date: Thu, 18 Apr 2019 14:42:19 -0400 Subject: [PATCH 9/9] PR Feedback: Comments are full sentences. --- pkg/block/index.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index 53f9a90097..b7fa36d66e 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -560,9 +560,9 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk OUTER: // This compares the current chunk to the chunk from the last iteration - // by pointers. If we use "i, c := range cks" the variable c is a new + // by pointers. If we use "i, c := range chks" the variable c is a new // variable who's address doesn't change through the entire loop. - // The current element of the chks slice is copied into it. We must take + // The current element of the chks slice is copied into it. We must take // the address of the indexed slice instead. for i := range chks { for _, ignoreChkFn := range ignoreChkFns { @@ -626,7 +626,7 @@ func rewrite( if err := indexr.Series(id, &lset, &chks); err != nil { return err } - // Make sure labels are in sorted order + // Make sure labels are in sorted order. sort.Sort(lset) for i, c := range chks { @@ -655,12 +655,13 @@ func rewrite( return errors.Wrap(all.Err(), "iterate series") } - // sort the series -- if labels moved around the ordering will be different + // Sort the series, if labels are re-ordered then the ordering of series + // will be different. sort.Slice(series, func(i, j int) bool { return labels.Compare(series[i].lset, series[j].lset) < 0 }) - // build new TSDB block + // Build a new TSDB block. for _, s := range series { if err := chunkw.WriteChunks(s.chks...); err != nil { return errors.Wrap(err, "write chunks")