From 781e335b47be8eb0515dc7d558071bca0c5d31f0 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Thu, 13 Jul 2023 12:08:33 +0530 Subject: [PATCH 1/2] TestQueryStoreDedup: This commit ensures that each testcase starts its own minio and StoreGW. The main motivation for doing this is that each testcase needs to allow StoreGW to advertise a different set of labels as external labels, and that no two cases interfere with each other. Signed-off-by: Saswata Mukherjee --- test/e2e/query_test.go | 56 ++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 8a3148259b..7917a73246 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -988,27 +988,6 @@ func TestQueryStoreDedup(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) t.Cleanup(cancel) - bucket := "store-gw-dedup-test" - minio := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) - testutil.Ok(t, e2e.StartAndWaitReady(minio)) - - l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test") - testutil.Ok(t, err) - - storeGW := e2ethanos.NewStoreGW( - e, - "s1", - client.BucketConfig{ - Type: client.S3, - Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()), - }, - "", - "", - nil, - ) - testutil.Ok(t, e2e.StartAndWaitReady(storeGW)) - tests := []struct { extReplicaLabel string intReplicaLabel string @@ -1036,7 +1015,7 @@ func TestQueryStoreDedup(t *testing.T) { }, { desc: "Deduplication works on external label with resorting required", - intReplicaLabel: "a", + extReplicaLabel: "a", series: []seriesWithLabels{ { intLabels: labels.FromStrings("__name__", "simple_series"), @@ -1143,16 +1122,33 @@ func TestQueryStoreDedup(t *testing.T) { }, } - // Prepare and upload all the blocks that will be used to S3. - var totalBlocks int - for _, tt := range tests { - createSimpleReplicatedBlocksInS3(ctx, t, e, l, bkt, tt.series, tt.blockFinderLabel) - totalBlocks += len(tt.series) - } - testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(float64(totalBlocks)), "thanos_blocks_meta_synced")) - for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { + bucket := "store-gw-" + tt.blockFinderLabel + minio := e2edb.NewMinio(e, "thanos-minio"+tt.blockFinderLabel, bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(minio)) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test") + testutil.Ok(t, err) + + storeGW := e2ethanos.NewStoreGW( + e, + "s1"+tt.blockFinderLabel, + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()), + }, + "", + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(storeGW)) + + // Prepare and upload all the blocks that will be used to S3. + createSimpleReplicatedBlocksInS3(ctx, t, e, l, bkt, tt.series, tt.blockFinderLabel) + testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(float64(len(tt.series))), "thanos_blocks_meta_synced")) + querierBuilder := e2ethanos.NewQuerierBuilder(e, tt.blockFinderLabel, storeGW.InternalEndpoint("grpc")).WithProxyStrategy("lazy") var replicaLabels []string if tt.intReplicaLabel != "" { From bd8b1074bbbd320d0ce9494f515c6233aa646cf6 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Thu, 13 Jul 2023 12:31:07 +0530 Subject: [PATCH 2/2] Resort store response set on internal label dedup When deduplicating on labels which are stored internally in TSDB, the store response set needs to be resorted after replica labels are removed. This commit detects when internal labels are being used for dedup, by checking for their presence in the Store client's external labels. Signed-off-by: Saswata Mukherjee --- pkg/query/querier.go | 5 ----- pkg/store/proxy_heap.go | 21 ++++++++++++++++++++- test/e2e/query_test.go | 29 +++++++++++++++-------------- test/e2e/receive_test.go | 6 +++--- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/pkg/query/querier.go b/pkg/query/querier.go index cbc7ec39f5..e173c0be7a 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -164,11 +164,6 @@ func newQuerier( } ctx, cancel := context.WithCancel(ctx) - rl := make(map[string]struct{}) - for _, replicaLabel := range replicaLabels { - rl[replicaLabel] = struct{}{} - } - partialResponseStrategy := storepb.PartialResponseStrategy_ABORT if partialResponse { partialResponseStrategy = storepb.PartialResponseStrategy_WARN diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d5cc940637..56a6bb44a9 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -575,7 +575,7 @@ func newAsyncRespSet( } var labelsToRemove map[string]struct{} - if !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 { + if hasInternalReplicaLabels(st, req) || !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 { level.Warn(logger).Log("msg", "detecting store that does not support without replica label setting. "+ "Falling back to eager retrieval with additional sort. Make sure your storeAPI supports it to speed up your queries", "store", st.String()) retrievalStrategy = EagerRetrieval @@ -618,6 +618,25 @@ func newAsyncRespSet( } } +// hasInternalReplicaLabels returns true if any replica label in the series request is not an +// external label for the given Client. +func hasInternalReplicaLabels(st Client, req *storepb.SeriesRequest) bool { + for _, labelName := range req.WithoutReplicaLabels { + isInLabelSet := false + for _, lbls := range st.LabelSets() { + if lbls.Get(labelName) != "" { + isInLabelSet = true + break + } + } + + if !isInLabelSet { + return true + } + } + return false +} + func (l *lazyRespSet) Close() { l.bufferedResponsesMtx.Lock() defer l.bufferedResponsesMtx.Unlock() diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 7917a73246..b5c16ba629 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1050,8 +1050,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupint", expectedSeries: 1, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return double the expected series. expectedDedupBug: true, }, { @@ -1073,8 +1073,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupintresort", expectedSeries: 2, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return double the expected series. expectedDedupBug: true, }, { @@ -1096,8 +1096,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupintextra", expectedSeries: 2, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return double the expected series. expectedDedupBug: true, }, { @@ -1116,8 +1116,8 @@ func TestQueryStoreDedup(t *testing.T) { }, blockFinderLabel: "dedupintext", expectedSeries: 1, - // This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 - // is fixed. This means that it will return double the expected series until then. + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return double the expected series. expectedDedupBug: true, }, } @@ -1165,9 +1165,10 @@ func TestQueryStoreDedup(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(querier)) expectedSeries := tt.expectedSeries - if tt.expectedDedupBug { - expectedSeries *= 2 - } + // The below commented condition checks for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // if tt.expectedDedupBug { + // expectedSeries *= 2 + // } instantQuery(t, ctx, querier.Endpoint("http"), func() string { return fmt.Sprintf("max_over_time(simple_series{block_finder='%s'}[2h])", tt.blockFinderLabel) }, time.Now, promclient.QueryOptions{ @@ -1298,13 +1299,13 @@ func TestSidecarQueryDedup(t *testing.T) { t.Run("deduplication on internal label with reorder", func(t *testing.T) { // Uses "a" as replica label, which is an internal label from the samples used. - // Should return 4 samples as long as the bug described by https://github.com/thanos-io/thanos/issues/6257#issuecomment-1544023978 - // is not fixed. When it is fixed, it should return 2 samples. + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return 4 samples instead of 2. instantQuery(t, ctx, query4.Endpoint("http"), func() string { return "my_fake_metric" }, time.Now, promclient.QueryOptions{ Deduplicate: true, - }, 4) + }, 2) }) } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 117a4e4c1c..b4125c9eb0 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -233,13 +233,13 @@ test_metric{a="2", b="2"} 1`) }, }) - // This should've returned only 2 series, but is returning 4 until the problem reported in - // https://github.com/thanos-io/thanos/issues/6257 is fixed + // This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. + // Until the bug was fixed, this testcase would return 4 series instead of 2. instantQuery(t, ctx, qStatic.Endpoint("http"), func() string { return "test_metric" }, time.Now, promclient.QueryOptions{ Deduplicate: true, - }, 4) + }, 2) }) t.Run("router_replication", func(t *testing.T) {