diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 89ab422fadd..66d480e07a0 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -391,7 +391,7 @@ func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, syn var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter -// LabelShardedMetaFilter is a MetaFetcher filter that filters out blocks that have no labels after relabelling. +// LabelShardedMetaFilter represents struct that allows sharding. type LabelShardedMetaFilter struct { relabelConfig []*relabel.Config } @@ -401,14 +401,23 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet return &LabelShardedMetaFilter{relabelConfig: relabelConfig} } -// Filter filters out blocks that filters blocks that have no labels after relabelling. +// Special block that will have ULID of the meta.json being referenced too. +const blockIDLabel = "__block_id" + +// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { + var lbls labels.Labels for id, m := range metas { - if processedLabels := relabel.Process(labels.FromMap(m.Thanos.Labels), f.relabelConfig...); processedLabels != nil { - continue + lbls = lbls[:0] + lbls = append(lbls, labels.Label{Name: blockIDLabel, Value: id.String()}) + for k, v := range m.Thanos.Labels { + lbls = append(lbls, labels.Label{Name: k, Value: v}) + } + + if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 { + synced.WithLabelValues(labelExcludedMeta).Inc() + delete(metas, id) } - synced.WithLabelValues(labelExcludedMeta).Inc() - delete(metas, id) } } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 7faf9d3062c..eb93a5246b5 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -278,7 +278,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { }) } -func TestLabelShardedMetaFilter_Filter(t *testing.T) { +func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { relabelContentYaml := ` - action: drop regex: "A" @@ -340,6 +340,104 @@ func TestLabelShardedMetaFilter_Filter(t *testing.T) { } +func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { + relabelContentYamlFmt := ` + - action: hashmod + source_labels: ["%s"] + target_label: shard + modulus: 3 + - action: keep + source_labels: ["shard"] + regex: %d +` + for i := 0; i < 3; i++ { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + var relabelConfig []*relabel.Config + testutil.Ok(t, yaml.Unmarshal([]byte(fmt.Sprintf(relabelContentYamlFmt, blockIDLabel, i)), &relabelConfig)) + + f := NewLabelShardedMetaFilter(relabelConfig) + + input := map[ulid.ULID]*metadata.Meta{ + ULID(1): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "B", "message": "keepme"}, + }, + }, + ULID(2): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"something": "A", "message": "keepme"}, + }, + }, + ULID(3): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "A", "message": "keepme"}, + }, + }, + ULID(4): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "A", "something": "B", "message": "keepme"}, + }, + }, + ULID(5): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "B"}, + }, + }, + ULID(6): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "B", "message": "keepme"}, + }, + }, + ULID(7): {}, + ULID(8): {}, + ULID(9): {}, + ULID(10): {}, + ULID(11): {}, + ULID(12): {}, + ULID(13): {}, + ULID(14): {}, + ULID(15): {}, + } + expected := map[ulid.ULID]*metadata.Meta{} + switch i { + case 0: + expected = map[ulid.ULID]*metadata.Meta{ + ULID(2): input[ULID(2)], + ULID(6): input[ULID(6)], + ULID(11): input[ULID(11)], + ULID(13): input[ULID(13)], + } + case 1: + expected = map[ulid.ULID]*metadata.Meta{ + ULID(5): input[ULID(5)], + ULID(7): input[ULID(7)], + ULID(10): input[ULID(10)], + ULID(12): input[ULID(12)], + ULID(14): input[ULID(14)], + ULID(15): input[ULID(15)], + } + case 2: + expected = map[ulid.ULID]*metadata.Meta{ + ULID(1): input[ULID(1)], + ULID(3): input[ULID(3)], + ULID(4): input[ULID(4)], + ULID(8): input[ULID(8)], + ULID(9): input[ULID(9)], + } + } + deleted := len(input) - len(expected) + + synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + f.Filter(input, synced, false) + + testutil.Equals(t, expected, input) + testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) + + }) + + } +} + func TestTimePartitionMetaFilter_Filter(t *testing.T) { mint := time.Unix(0, 1*time.Millisecond.Nanoseconds()) maxt := time.Unix(0, 10*time.Millisecond.Nanoseconds())