From bdf1241f86199b6c25562cff8dbfd12a245bd2f0 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 8 Jan 2021 00:38:52 -0500 Subject: [PATCH] support bucket rewrite relabel Signed-off-by: yeya24 --- cmd/thanos/tools_bucket.go | 36 +++++- docs/components/tools.md | 8 ++ pkg/block/fetcher.go | 9 +- pkg/block/metadata/meta.go | 3 + pkg/compactv2/chunk_series_set.go | 1 - pkg/compactv2/compactor_test.go | 195 +++++++++++++++++++++++++++++- pkg/compactv2/modifiers.go | 154 ++++++++++++++++++++++- 7 files changed, 391 insertions(+), 15 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 2184f30fa83..2559ae849de 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -8,6 +8,7 @@ import ( "crypto/rand" "encoding/json" "fmt" + "github.com/prometheus/prometheus/pkg/relabel" "io/ioutil" "net/http" "os" @@ -819,7 +820,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). Default("").Enum("SHA256", "") dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() - toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true) + toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", false) + toRelabel := extflag.RegisterPathOrContent(cmd, "rewrite.to-relabel-config", "YAML file that contains relabel configs that will be applied to blocks", false) provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool() promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool() cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -833,15 +835,36 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat return err } - deletionsYaml, err := toDelete.Content() + var modifiers []compactv2.Modifier + + relabelYaml, err := toRelabel.Content() if err != nil { return err } + var relabels []*relabel.Config + if len(relabelYaml) > 0 { + relabels, err = block.ParseRelabelConfig(relabelYaml, nil) + if err != nil { + return err + } + modifiers = append(modifiers, compactv2.WithRelabelModifier(relabels...)) + } - var deletions []metadata.DeletionRequest - if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil { + deletionsYaml, err := toDelete.Content() + if err != nil { return err } + var deletions []metadata.DeletionRequest + if len(deletionsYaml) > 0 { + if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil { + return err + } + modifiers = append(modifiers, compactv2.WithDeletionModifier(deletions...)) + } + + if len(modifiers) == 0 { + return errors.New("rewrite configuration should be provided") + } var ids []ulid.ULID for _, id := range *blockIDs { @@ -885,6 +908,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{ Sources: meta.Compaction.Sources, DeletionsApplied: deletions, + RelabelsApplied: relabels, }) meta.Compaction.Sources = []ulid.ULID{newID} meta.Thanos.Source = metadata.BucketRewriteSource @@ -916,8 +940,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool) } - level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml)) - if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil { + level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml), "toRelabel", string(relabelYaml)) + if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, modifiers...); err != nil { return errors.Wrapf(err, "writing series from %v to %v", id, newID) } diff --git a/docs/components/tools.md b/docs/components/tools.md index 2d66d896076..b69c45d2279 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -769,6 +769,14 @@ Flags: Path to YAML file that contains []metadata.DeletionRequest that will be applied to blocks + --rewrite.to-relabel-config= + Alternative to 'rewrite.to-relabel-config-file' + flag (mutually exclusive). Content of YAML file + that contains relabel configs that will be + applied to blocks + --rewrite.to-relabel-config-file= + Path to YAML file that contains relabel configs + that will be applied to blocks --tmp.dir="/tmp/thanos-rewrite" Working directory for temporary files --tracing.config= diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 0fd2458f1d0..7c9ca71e3c7 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -879,15 +879,18 @@ var ( ) // ParseRelabelConfig parses relabel configuration. +// If supportedActions not specified, all relabel actions are valid. func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) { var relabelConfig []*relabel.Config if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil { return nil, errors.Wrap(err, "parsing relabel configuration") } - for _, cfg := range relabelConfig { - if _, ok := supportedActions[cfg.Action]; !ok { - return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action) + if supportedActions != nil { + for _, cfg := range relabelConfig { + if _, ok := supportedActions[cfg.Action]; !ok { + return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action) + } } } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index f88aae6a20c..c2b0d14df44 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -11,6 +11,7 @@ package metadata import ( "encoding/json" "fmt" + "github.com/prometheus/prometheus/pkg/relabel" "io" "os" "path/filepath" @@ -95,6 +96,8 @@ type Rewrite struct { Sources []ulid.ULID `json:"sources,omitempty"` // Deletions if applied (in order). DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"` + // Relabels if applied. + RelabelsApplied []*relabel.Config `json:"relabels_applied,omitempty"` } type Matchers []*labels.Matcher diff --git a/pkg/compactv2/chunk_series_set.go b/pkg/compactv2/chunk_series_set.go index 6bcaa2abf66..97b17862178 100644 --- a/pkg/compactv2/chunk_series_set.go +++ b/pkg/compactv2/chunk_series_set.go @@ -5,7 +5,6 @@ package compactv2 import ( "context" - "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" diff --git a/pkg/compactv2/compactor_test.go b/pkg/compactv2/compactor_test.go index e5c979a0539..ed1364877de 100644 --- a/pkg/compactv2/compactor_test.go +++ b/pkg/compactv2/compactor_test.go @@ -6,6 +6,9 @@ package compactv2 import ( "bytes" "context" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/tsdb/tombstones" "io/ioutil" "math" "os" @@ -22,14 +25,13 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" - "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/testutil" ) func TestCompactor_WriteSeries_e2e(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Minute) defer cancel() logger := log.NewLogfmtLogger(os.Stderr) @@ -303,8 +305,7 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) { expectedStats: tsdb.BlockStats{ NumSamples: 12, NumSeries: 2, - NumChunks: 2, - }, + NumChunks: 2}, }, { name: "1 blocks + delete modifier. For deletion request, full match is required. Delete the first two series", @@ -397,6 +398,192 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) { NumChunks: 12, }, }, + { + name: "1 block + relabel modifier, separate chunks from the same series are merged", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}}, + }, + }, + // Not used in this test case. + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("2"), + SourceLabels: model.LabelNames{"a"}, + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 6, + NumSeries: 1, + NumChunks: 1, + }, + }, + { + name: "1 block + relabel modifier, delete first series", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("1"), + SourceLabels: model.LabelNames{"a"}, + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"1\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 13, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 block + relabel modifier, series reordered", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // {a="1"} will be relabeled to {a="3"} while {a="2"} will be relabeled to {a="0"}. + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("1"), + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "a", + Replacement: "3", + }, + &relabel.Config{ + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("2"), + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "a", + Replacement: "0", + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "0"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}}, + }, + expectedChanges: "Relabelled {a=\"1\"} {a=\"3\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 13, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 block + relabel modifier, series deleted because of no labels left after relabel", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // Drop all label name "a". + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("a"), + }, + )}, + expected: nil, + expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nDeleted {a=\"2\"} [{0 25}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 0, + NumSeries: 0, + NumChunks: 0, + }, + }, + { + name: "1 block + relabel modifier, series 1 is deleted because of no labels left after relabel", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // Drop all label name "a". + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("a"), + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "b", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nRelabelled {a=\"2\", b=\"1\"} {b=\"1\"}\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 7, + NumSeries: 1, + NumChunks: 1, + }, + }, + { + name: "1 block + relabel modifier, series merged after relabeling", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {10, 10}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {2, 2}, {3, 3}}, {{4, 4}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // Drop all label name "a". + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("1|2"), + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "a", + Replacement: "0", + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "0"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + expectedChanges: "Relabelled {a=\"1\"} {a=\"0\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 9, + NumSeries: 1, + NumChunks: 1, + }, + }, } { t.Run(tcase.name, func(t *testing.T) { tmpDir, err := ioutil.TempDir("", "test-series-writer") diff --git a/pkg/compactv2/modifiers.go b/pkg/compactv2/modifiers.go index aef4ec065ea..c3c7f16e9f6 100644 --- a/pkg/compactv2/modifiers.go +++ b/pkg/compactv2/modifiers.go @@ -4,7 +4,12 @@ package compactv2 import ( + "math" + "sort" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -325,4 +330,151 @@ func (p *delChunkSeriesIterator) Next() bool { func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr } -// TODO(bwplotka): Add relabelling. +type RelabelModifier struct { + relabels []*relabel.Config +} + +func WithRelabelModifier(relabels ...*relabel.Config) *RelabelModifier { + return &RelabelModifier{relabels: relabels} +} + +func (d *RelabelModifier) Modify(_ index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) { + // Gather symbols. + symbols := make(map[string]struct{}) + chunkSeriesMap := make(map[string]*chunkSeriesBuilder, 0) + + for set.Next() { + s := set.At() + lbls := s.Labels() + + if processedLabels := relabel.Process(lbls, d.relabels...); len(processedLabels) == 0 { + // Special case: Delete whole series. + chksIter := s.Iterator() + var ( + minT int64 = math.MaxInt64 + maxT int64 = math.MinInt64 + ) + for chksIter.Next() { + c := chksIter.At() + if c.MinTime < minT { + minT = c.MinTime + } + if c.MaxTime > maxT { + maxT = c.MaxTime + } + } + + if err := chksIter.Err(); err != nil { + return errorOnlyStringIter{err: err}, nil + } + + var deleted tombstones.Intervals + // If minTime is set then there is at least one chunk. + if minT != math.MaxInt64 { + deleted = deleted.Add(tombstones.Interval{Mint: minT, Maxt: maxT}) + } + log.DeleteSeries(lbls, deleted) + p.SeriesProcessed() + } else { + for _, lb := range processedLabels { + symbols[lb.Name] = struct{}{} + symbols[lb.Value] = struct{}{} + } + + lbStr := processedLabels.String() + if _, ok := chunkSeriesMap[lbStr]; !ok { + chunkSeriesMap[lbStr] = newChunkSeriesBuilder(processedLabels) + } + csb := chunkSeriesMap[lbStr] + chksIter := s.Iterator() + for chksIter.Next() { + c := chksIter.At() + csb.addIter(c.Chunk.Iterator(nil)) + } + if err := chksIter.Err(); err != nil { + return errorOnlyStringIter{err}, nil + } + + if !labels.Equal(lbls, processedLabels) { + log.ModifySeries(lbls, processedLabels) + } + } + } + + symbolsSlice := make([]string, 0, len(symbols)) + for s := range symbols { + symbolsSlice = append(symbolsSlice, s) + } + sort.Strings(symbolsSlice) + + chunkSeriesSet := make([]storage.ChunkSeries, 0, len(chunkSeriesMap)) + for _, chunkSeries := range chunkSeriesMap { + chunkSeriesSet = append(chunkSeriesSet, chunkSeries) + } + sort.Slice(chunkSeriesSet, func(i, j int) bool { + return labels.Compare(chunkSeriesSet[i].Labels(), chunkSeriesSet[j].Labels()) < 0 + }) + return index.NewStringListIter(symbolsSlice), newListChunkSeriesSet(chunkSeriesSet...) +} + +// chunkSeriesBuilder build storage.ChunkSeries from chunkenc.Iterator. +type chunkSeriesBuilder struct { + lset labels.Labels + ss []storage.Series +} + +func newChunkSeriesBuilder(lset labels.Labels) *chunkSeriesBuilder { + return &chunkSeriesBuilder{ + lset: lset, + ss: make([]storage.Series, 0), + } +} + +func (s *chunkSeriesBuilder) addIter(iter chunkenc.Iterator) { + s.ss = append(s.ss, &storage.SeriesEntry{ + SampleIteratorFn: func() chunkenc.Iterator { + return iter + }, + }) +} + +func (s *chunkSeriesBuilder) Labels() labels.Labels { + return s.lset +} + +func (s *chunkSeriesBuilder) Iterator() chunks.Iterator { + if len(s.ss) == 0 { + return nil + } + if len(s.ss) == 1 { + return storage.NewSeriesToChunkEncoder(s.ss[0]).Iterator() + } + + return storage.NewSeriesToChunkEncoder(storage.ChainedSeriesMerge(s.ss...)).Iterator() +} + +type errorOnlyStringIter struct { + err error +} + +func (errorOnlyStringIter) Next() bool { return false } +func (errorOnlyStringIter) At() string { return "" } +func (s errorOnlyStringIter) Err() error { return s.err } + +type listChunkSeriesSet struct { + css []storage.ChunkSeries + idx int +} + +func newListChunkSeriesSet(css ...storage.ChunkSeries) storage.ChunkSeriesSet { + return &listChunkSeriesSet{css: css, idx: -1} +} + +func (s *listChunkSeriesSet) Next() bool { + s.idx++ + return s.idx < len(s.css) +} + +func (s *listChunkSeriesSet) At() storage.ChunkSeries { return s.css[s.idx] } +func (s *listChunkSeriesSet) Err() error { return nil } +func (s *listChunkSeriesSet) Warnings() storage.Warnings { return nil }