Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bucket rewrite relabel #3707

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#4125](https://github.com/thanos-io/thanos/pull/4125) Rule: Add `--alert.relabel-config` / `--alert.relabel-config-file` allowing to specify alert relabel configurations like [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config)
- [#4211](https://github.com/thanos-io/thanos/pull/4211) Add TLS and basic authentication to Thanos APIs
- [#4249](https://github.com/thanos-io/thanos/pull/4249) UI: add dark theme
- [#3707](https://github.com/thanos-io/thanos/pull/3707) Tools: Added `--rewrite.to-relabel-config` to bucket rewrite tool to support series relabel from given blocks.

### Fixed
-
Expand Down
37 changes: 31 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

v1 "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -819,7 +821,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", false)
toRelabel := extflag.RegisterPathOrContent(cmd, "rewrite.to-relabel-config", "YAML file that contains relabel configs that will be applied to blocks", false)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -833,15 +836,36 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

deletionsYaml, err := toDelete.Content()
var modifiers []compactv2.Modifier

relabelYaml, err := toRelabel.Content()
if err != nil {
return err
}
var relabels []*relabel.Config
if len(relabelYaml) > 0 {
relabels, err = block.ParseRelabelConfig(relabelYaml, nil)
if err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithRelabelModifier(relabels...))
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}
var deletions []metadata.DeletionRequest
if len(deletionsYaml) > 0 {
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithDeletionModifier(deletions...))
}

if len(modifiers) == 0 {
return errors.New("rewrite configuration should be provided")
}

var ids []ulid.ULID
for _, id := range *blockIDs {
Expand Down Expand Up @@ -885,6 +909,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{
Sources: meta.Compaction.Sources,
DeletionsApplied: deletions,
RelabelsApplied: relabels,
})
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource
Expand Down Expand Up @@ -916,8 +941,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml), "toRelabel", string(relabelYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, modifiers...); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

Expand Down
8 changes: 8 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,14 @@ Flags:
Path to YAML file that contains
[]metadata.DeletionRequest that will be applied
to blocks
--rewrite.to-relabel-config=<content>
Alternative to 'rewrite.to-relabel-config-file'
flag (mutually exclusive). Content of YAML file
that contains relabel configs that will be
applied to blocks
--rewrite.to-relabel-config-file=<file-path>
Path to YAML file that contains relabel configs
that will be applied to blocks
--tmp.dir="/tmp/thanos-rewrite"
Working directory for temporary files
--tracing.config=<content>
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,15 +879,18 @@ var (
)

// ParseRelabelConfig parses relabel configuration.
// If supportedActions not specified, all relabel actions are valid.
func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}

for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
if supportedActions != nil {
for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/thanos-io/thanos/pkg/runutil"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/runutil"
)

type SourceType string
Expand Down Expand Up @@ -95,6 +97,8 @@ type Rewrite struct {
Sources []ulid.ULID `json:"sources,omitempty"`
// Deletions if applied (in order).
DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"`
// Relabels if applied.
RelabelsApplied []*relabel.Config `json:"relabels_applied,omitempty"`
}

type Matchers []*labels.Matcher
Expand Down
1 change: 1 addition & 0 deletions pkg/compactv2/chunk_series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/thanos/pkg/block"
)

Expand Down
189 changes: 189 additions & 0 deletions pkg/compactv2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -397,6 +400,192 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
NumChunks: 12,
},
},
{
name: "1 block + relabel modifier, two chunks from the same series are merged into one larger chunk",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}},
},
},
// Not used in this test case.
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("no-match"),
SourceLabels: model.LabelNames{"a"},
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
},
expectedStats: tsdb.BlockStats{
NumSamples: 6,
NumSeries: 1,
NumChunks: 1,
},
},
{
name: "1 block + relabel modifier, delete first series",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"},
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 13,
NumSeries: 2,
NumChunks: 2,
},
},
{
name: "1 block + relabel modifier, series reordered",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// {a="1"} will be relabeled to {a="3"} while {a="2"} will be relabeled to {a="0"}.
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "3",
},
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("2"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "0",
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "0"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}},
},
expectedChanges: "Relabelled {a=\"1\"} {a=\"3\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 13,
NumSeries: 2,
NumChunks: 2,
},
},
{
name: "1 block + relabel modifier, series deleted because of no labels left after relabel",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.LabelDrop,
Regex: relabel.MustNewRegexp("a"),
},
)},
expected: nil,
expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nDeleted {a=\"2\"} [{0 25}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 0,
NumSeries: 0,
NumChunks: 0,
},
},
{
name: "1 block + relabel modifier, series 1 is deleted because of no labels left after relabel",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
{
{lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Drop all label name "a".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.LabelDrop,
Regex: relabel.MustNewRegexp("a"),
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nRelabelled {a=\"2\", b=\"1\"} {b=\"1\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 7,
NumSeries: 1,
NumChunks: 1,
},
},
{
name: "1 block + relabel modifier, series merged after relabeling",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {10, 10}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {2, 2}, {3, 3}}, {{4, 4}, {11, 11}, {20, 20}, {25, 25}}}},
},
},
// Replace values of label name "a" with "0".
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("1|2"),
SourceLabels: model.LabelNames{"a"},
TargetLabel: "a",
Replacement: "0",
},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "0"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}},
},
expectedChanges: "Relabelled {a=\"1\"} {a=\"0\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n",
expectedStats: tsdb.BlockStats{
NumSamples: 9,
NumSeries: 1,
NumChunks: 1,
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-series-writer")
Expand Down
Loading