Skip to content

Commit

Permalink
Fixed slash support in label for compactor.
Browse files Browse the repository at this point in the history
Fixes: #1661

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Oct 18, 2019
1 parent 0ec1e20 commit d5de67a
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 34 deletions.
8 changes: 4 additions & 4 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc()
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc()
metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)).Inc()

case downsample.ResLevel1:
missing := false
Expand All @@ -237,10 +237,10 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m))
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos))
return errors.Wrap(err, "downsampling to 60 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m))
metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos))
}
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,6 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.15 h1:CSSIDtllwGLMoA6zjdKnaE6Tx6eVUxQ29LUgGetiDCI=
github.com/miekg/dns v1.1.15/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.19 h1:0ymbfaLG1/utH2+BydNiF+dx1jSEmdr/nylOtkGHZZg=
github.com/miekg/dns v1.1.19/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.22 h1:Jm64b3bO9kP43ddLjL2EY3Io6bmy1qGb9Xxz6TqS6rc=
github.com/miekg/dns v1.1.22/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/minio/cli v1.20.0/go.mod h1:bYxnK0uS629N3Bq+AOZZ+6lwF77Sodk4+UL9vNuXhOY=
Expand Down
22 changes: 11 additions & 11 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,12 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels))
func GroupKey(meta metadata.Thanos) string {
return groupKey(meta.Downsample.Resolution, labels.FromMap(meta.Labels))
}

func groupKey(res int64, lbls labels.Labels) string {
return fmt.Sprintf("%d@%s", res, lbls)
return fmt.Sprintf("%d@%s", res, fmt.Sprintf("%v", lbls.Hash()))
}

// Groups returns the compaction groups for all blocks currently known to the syncer.
Expand All @@ -357,23 +357,23 @@ func (c *Syncer) Groups() (res []*Group, err error) {

groups := map[string]*Group{}
for _, m := range c.blocks {
g, ok := groups[GroupKey(*m)]
g, ok := groups[GroupKey(m.Thanos)]
if !ok {
g, err = newGroup(
log.With(c.logger, "compactionGroup", GroupKey(*m)),
log.With(c.logger, "compactionGroup", GroupKey(m.Thanos)),
c.bkt,
labels.FromMap(m.Thanos.Labels),
m.Thanos.Downsample.Resolution,
c.acceptMalformedIndex,
c.metrics.compactions.WithLabelValues(GroupKey(*m)),
c.metrics.compactionsRuns.WithLabelValues(GroupKey(*m)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)),
c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionsRuns.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.garbageCollectedBlocks,
)
if err != nil {
return nil, errors.Wrap(err, "create compaction group")
}
groups[GroupKey(*m)] = g
groups[GroupKey(m.Thanos)] = g
res = append(res, g)
}
if err := g.Add(m); err != nil {
Expand Down Expand Up @@ -839,8 +839,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir)
}

if cg.Key() != GroupKey(*meta) {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta)))
if cg.Key() != GroupKey(meta.Thanos) {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(meta.Thanos)))
}

for _, s := range meta.Compaction.Sources {
Expand Down
28 changes: 14 additions & 14 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestGroup_Compact_e2e(t *testing.T) {

// Test label name with slash, regression: https://github.com/thanos-io/thanos/issues/1661.
extLabels := labels.Labels{{Name: "e1", Value: "1/weird"}}
extLabels2 := labels.Labels{{Name: "e2", Value: "1"}}
extLabels2 := labels.Labels{{Name: "e1", Value: "1"}}
metas := createAndUpload(t, bkt, []blockgenSpec{
{
numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124,
Expand Down Expand Up @@ -313,21 +313,21 @@ func TestGroup_Compact_e2e(t *testing.T) {
testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
testutil.Equals(t, 4, MetricCount(sy.metrics.compactions))
testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(*metas[0]))))
testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(*metas[7]))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(*metas[4]))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(*metas[5]))))
testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[0].Thanos))))
testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[7].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[4].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[5].Thanos))))
testutil.Equals(t, 4, MetricCount(sy.metrics.compactionsRuns))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(*metas[0]))))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(*metas[7]))))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(metas[0].Thanos))))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(metas[7].Thanos))))
// TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate.
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(*metas[4]))))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(*metas[5]))))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(metas[4].Thanos))))
testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionsRuns.WithLabelValues(GroupKey(metas[5].Thanos))))
testutil.Equals(t, 4, MetricCount(sy.metrics.compactionFailures))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(*metas[0]))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(*metas[7]))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(*metas[4]))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(*metas[5]))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[0].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[7].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[4].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[5].Thanos))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir)
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
return err
}

others[GroupKey(meta)] = meta
others[GroupKey(meta.Thanos)] = meta
return nil
}))

Expand Down
41 changes: 41 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/relabel"
Expand Down Expand Up @@ -105,3 +107,42 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}

func TestGroupKey(t *testing.T) {
for _, tcase := range []struct {
input metadata.Thanos
expected string
}{
{
input: metadata.Thanos{},
expected: "0@17241709254077376921",
},
{
input: metadata.Thanos{
Labels: map[string]string{},
Downsample: metadata.ThanosDownsample{Resolution: 0},
},
expected: "0@17241709254077376921",
},
{
input: metadata.Thanos{
Labels: map[string]string{"foo": "bar", "foo1": "bar2"},
Downsample: metadata.ThanosDownsample{Resolution: 0},
},
expected: "0@2124638872457683483",
},
{
input: metadata.Thanos{
Labels: map[string]string{`foo/some..thing/some.thing/../`: `a_b_c/bar-something-a\metric/a\x`},
Downsample: metadata.ThanosDownsample{Resolution: 0},
},
expected: "0@16590761456214576373",
},
} {
if ok := t.Run("", func(t *testing.T) {
testutil.Equals(t, tcase.expected, GroupKey(tcase.input))
}); !ok {
return
}
}
}
2 changes: 1 addition & 1 deletion pkg/verifier/overlapped_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func fetchOverlaps(ctx context.Context, logger log.Logger, bkt objstore.Bucket)
return err
}

metas[compact.GroupKey(m)] = append(metas[compact.GroupKey(m)], m.BlockMeta)
metas[compact.GroupKey(m.Thanos)] = append(metas[compact.GroupKey(m.Thanos)], m.BlockMeta)
return nil
})
if err != nil {
Expand Down

0 comments on commit d5de67a

Please sign in to comment.