Skip to content

Commit

Permalink
support bucket rewrite relabel
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed May 21, 2021
1 parent 9216c8a commit bdf1241
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 15 deletions.
36 changes: 30 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/pkg/relabel"
"io/ioutil"
"net/http"
"os"
Expand Down Expand Up @@ -819,7 +820,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", false)
toRelabel := extflag.RegisterPathOrContent(cmd, "rewrite.to-relabel-config", "YAML file that contains relabel configs that will be applied to blocks", false)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -833,15 +835,36 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

deletionsYaml, err := toDelete.Content()
var modifiers []compactv2.Modifier

relabelYaml, err := toRelabel.Content()
if err != nil {
return err
}
var relabels []*relabel.Config
if len(relabelYaml) > 0 {
relabels, err = block.ParseRelabelConfig(relabelYaml, nil)
if err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithRelabelModifier(relabels...))
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}
var deletions []metadata.DeletionRequest
if len(deletionsYaml) > 0 {
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithDeletionModifier(deletions...))
}

if len(modifiers) == 0 {
return errors.New("rewrite configuration should be provided")
}

var ids []ulid.ULID
for _, id := range *blockIDs {
Expand Down Expand Up @@ -885,6 +908,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{
Sources: meta.Compaction.Sources,
DeletionsApplied: deletions,
RelabelsApplied: relabels,
})
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource
Expand Down Expand Up @@ -916,8 +940,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml), "toRelabel", string(relabelYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, modifiers...); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

Expand Down
8 changes: 8 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,14 @@ Flags:
Path to YAML file that contains
[]metadata.DeletionRequest that will be applied
to blocks
--rewrite.to-relabel-config=<content>
Alternative to 'rewrite.to-relabel-config-file'
flag (mutually exclusive). Content of YAML file
that contains relabel configs that will be
applied to blocks
--rewrite.to-relabel-config-file=<file-path>
Path to YAML file that contains relabel configs
that will be applied to blocks
--tmp.dir="/tmp/thanos-rewrite"
Working directory for temporary files
--tracing.config=<content>
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,15 +879,18 @@ var (
)

// ParseRelabelConfig parses relabel configuration.
// If supportedActions not specified, all relabel actions are valid.
func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}

for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
if supportedActions != nil {
for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package metadata
import (
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/pkg/relabel"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -95,6 +96,8 @@ type Rewrite struct {
Sources []ulid.ULID `json:"sources,omitempty"`
// Deletions if applied (in order).
DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"`
// Relabels if applied.
RelabelsApplied []*relabel.Config `json:"relabels_applied,omitempty"`
}

type Matchers []*labels.Matcher
Expand Down
1 change: 0 additions & 1 deletion pkg/compactv2/chunk_series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package compactv2

import (
"context"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand Down
195 changes: 191 additions & 4 deletions pkg/compactv2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package compactv2
import (
"bytes"
"context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb/tombstones"
"io/ioutil"
"math"
"os"
Expand All @@ -22,14 +25,13 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCompactor_WriteSeries_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Minute)
defer cancel()

logger := log.NewLogfmtLogger(os.Stderr)
Expand Down Expand Up @@ -303,8 +305,7 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
expectedStats: tsdb.BlockStats{
NumSamples: 12,
NumSeries: 2,
NumChunks: 2,
},
NumChunks: 2},
},
{
name: "1 blocks + delete modifier. For deletion request, full match is required. Delete the first two series",
Expand Down Expand Up @@ -397,6 +398,192 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
NumChunks: 12,
},
},
{
name: "1 block + relabel modifier, separate chunks from the same series are merged",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}},
},
},
// Not used in this test case.
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("2"),
SourceLabels: model.LabelNames{"a"},
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
},
expectedStats: tsdb.BlockStats{
NumSamples: 6,
NumSeries: 1,
NumChunks: 1,
},
},
{
name: "1 block + relabel modifier, delete first series",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"},
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 13,
NumSeries: 2,
NumChunks: 2,
},
},
{
name: "1 block + relabel modifier, series reordered",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// {a="1"} will be relabeled to {a="3"} while {a="2"} will be relabeled to {a="0"}.
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "3",
},
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("2"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "0",
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "0"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}},
},
expectedChanges: "Relabelled {a=\"1\"} {a=\"3\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 13,
NumSeries: 2,
NumChunks: 2,
},
},
{
name: "1 block + relabel modifier, series deleted because of no labels left after relabel",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.LabelDrop,
Regex: relabel.MustNewRegexp("a"),
},
)},
expected: nil,
expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nDeleted {a=\"2\"} [{0 25}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 0,
NumSeries: 0,
NumChunks: 0,
},
},
{
name: "1 block + relabel modifier, series 1 is deleted because of no labels left after relabel",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
{
{lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.LabelDrop,
Regex: relabel.MustNewRegexp("a"),
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nRelabelled {a=\"2\", b=\"1\"} {b=\"1\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 7,
NumSeries: 1,
NumChunks: 1,
},
},
{
name: "1 block + relabel modifier, series merged after relabeling",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {10, 10}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {2, 2}, {3, 3}}, {{4, 4}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("1|2"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "0",
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "0"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
expectedChanges: "Relabelled {a=\"1\"} {a=\"0\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 9,
NumSeries: 1,
NumChunks: 1,
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-series-writer")
Expand Down
Loading

0 comments on commit bdf1241

Please sign in to comment.