Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resort store response set on internal label dedup #6529

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,6 @@ func newQuerier(
}
ctx, cancel := context.WithCancel(ctx)

rl := make(map[string]struct{})
for _, replicaLabel := range replicaLabels {
rl[replicaLabel] = struct{}{}
}

partialResponseStrategy := storepb.PartialResponseStrategy_ABORT
if partialResponse {
partialResponseStrategy = storepb.PartialResponseStrategy_WARN
Expand Down
21 changes: 20 additions & 1 deletion pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func newAsyncRespSet(
}

var labelsToRemove map[string]struct{}
if !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 {
if hasInternalReplicaLabels(st, req) || !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 {
level.Warn(logger).Log("msg", "detecting store that does not support without replica label setting. "+
"Falling back to eager retrieval with additional sort. Make sure your storeAPI supports it to speed up your queries", "store", st.String())
retrievalStrategy = EagerRetrieval
Expand Down Expand Up @@ -618,6 +618,25 @@ func newAsyncRespSet(
}
}

// hasInternalReplicaLabels returns true if any replica label in the series request is not an
// external label for the given Client.
func hasInternalReplicaLabels(st Client, req *storepb.SeriesRequest) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always lead to re-sorting if stores have different external labels right? Like prometheus_replica on a sidecar and rule_replica on a ruler.

for _, labelName := range req.WithoutReplicaLabels {
isInLabelSet := false
for _, lbls := range st.LabelSets() {
if lbls.Get(labelName) != "" {
isInLabelSet = true
break
}
}

if !isInLabelSet {
return true
}
}
return false
}

func (l *lazyRespSet) Close() {
l.bufferedResponsesMtx.Lock()
defer l.bufferedResponsesMtx.Unlock()
Expand Down
85 changes: 41 additions & 44 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,27 +988,6 @@ func TestQueryStoreDedup(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)

bucket := "store-gw-dedup-test"
minio := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(minio))

l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test")
testutil.Ok(t, err)

storeGW := e2ethanos.NewStoreGW(
e,
"s1",
client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()),
},
"",
"",
nil,
)
testutil.Ok(t, e2e.StartAndWaitReady(storeGW))

tests := []struct {
extReplicaLabel string
intReplicaLabel string
Expand Down Expand Up @@ -1036,7 +1015,7 @@ func TestQueryStoreDedup(t *testing.T) {
},
{
desc: "Deduplication works on external label with resorting required",
intReplicaLabel: "a",
extReplicaLabel: "a",
series: []seriesWithLabels{
{
intLabels: labels.FromStrings("__name__", "simple_series"),
Expand Down Expand Up @@ -1071,8 +1050,8 @@ func TestQueryStoreDedup(t *testing.T) {
},
blockFinderLabel: "dedupint",
expectedSeries: 1,
// This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257
// is fixed. This means that it will return double the expected series until then.
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// Until the bug was fixed, this testcase would return double the expected series.
expectedDedupBug: true,
},
{
Expand All @@ -1094,8 +1073,8 @@ func TestQueryStoreDedup(t *testing.T) {
},
blockFinderLabel: "dedupintresort",
expectedSeries: 2,
// This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257
// is fixed. This means that it will return double the expected series until then.
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// Until the bug was fixed, this testcase would return double the expected series.
expectedDedupBug: true,
},
{
Expand All @@ -1117,8 +1096,8 @@ func TestQueryStoreDedup(t *testing.T) {
},
blockFinderLabel: "dedupintextra",
expectedSeries: 2,
// This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257
// is fixed. This means that it will return double the expected series until then.
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// Until the bug was fixed, this testcase would return double the expected series.
expectedDedupBug: true,
},
{
Expand All @@ -1137,22 +1116,39 @@ func TestQueryStoreDedup(t *testing.T) {
},
blockFinderLabel: "dedupintext",
expectedSeries: 1,
// This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257
// is fixed. This means that it will return double the expected series until then.
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// Until the bug was fixed, this testcase would return double the expected series.
expectedDedupBug: true,
},
}

// Prepare and upload all the blocks that will be used to S3.
var totalBlocks int
for _, tt := range tests {
createSimpleReplicatedBlocksInS3(ctx, t, e, l, bkt, tt.series, tt.blockFinderLabel)
totalBlocks += len(tt.series)
}
testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(float64(totalBlocks)), "thanos_blocks_meta_synced"))

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
bucket := "store-gw-" + tt.blockFinderLabel
minio := e2edb.NewMinio(e, "thanos-minio"+tt.blockFinderLabel, bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(minio))

l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test")
testutil.Ok(t, err)

storeGW := e2ethanos.NewStoreGW(
e,
"s1"+tt.blockFinderLabel,
client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()),
},
"",
"",
nil,
)
testutil.Ok(t, e2e.StartAndWaitReady(storeGW))

// Prepare and upload all the blocks that will be used to S3.
createSimpleReplicatedBlocksInS3(ctx, t, e, l, bkt, tt.series, tt.blockFinderLabel)
testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(float64(len(tt.series))), "thanos_blocks_meta_synced"))

querierBuilder := e2ethanos.NewQuerierBuilder(e, tt.blockFinderLabel, storeGW.InternalEndpoint("grpc")).WithProxyStrategy("lazy")
var replicaLabels []string
if tt.intReplicaLabel != "" {
Expand All @@ -1169,9 +1165,10 @@ func TestQueryStoreDedup(t *testing.T) {
testutil.Ok(t, e2e.StartAndWaitReady(querier))

expectedSeries := tt.expectedSeries
if tt.expectedDedupBug {
expectedSeries *= 2
}
// The below commented condition checks for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// if tt.expectedDedupBug {
// expectedSeries *= 2
// }
instantQuery(t, ctx, querier.Endpoint("http"), func() string {
return fmt.Sprintf("max_over_time(simple_series{block_finder='%s'}[2h])", tt.blockFinderLabel)
}, time.Now, promclient.QueryOptions{
Expand Down Expand Up @@ -1302,13 +1299,13 @@ func TestSidecarQueryDedup(t *testing.T) {

t.Run("deduplication on internal label with reorder", func(t *testing.T) {
// Uses "a" as replica label, which is an internal label from the samples used.
// Should return 4 samples as long as the bug described by https://github.com/thanos-io/thanos/issues/6257#issuecomment-1544023978
// is not fixed. When it is fixed, it should return 2 samples.
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// Until the bug was fixed, this testcase would return 4 samples instead of 2.
instantQuery(t, ctx, query4.Endpoint("http"), func() string {
return "my_fake_metric"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, 4)
}, 2)
})
}

Expand Down
6 changes: 3 additions & 3 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ test_metric{a="2", b="2"} 1`)
},
})

// This should've returned only 2 series, but is returning 4 until the problem reported in
// https://github.com/thanos-io/thanos/issues/6257 is fixed
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257.
// Until the bug was fixed, this testcase would return 4 series instead of 2.
instantQuery(t, ctx, qStatic.Endpoint("http"), func() string {
return "test_metric"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, 4)
}, 2)
})

t.Run("router_replication", func(t *testing.T) {
Expand Down