diff --git a/pkg/queryfrontend/labels_codec.go b/pkg/queryfrontend/labels_codec.go index e3f7261288..b4caed1cd5 100644 --- a/pkg/queryfrontend/labels_codec.go +++ b/pkg/queryfrontend/labels_codec.go @@ -49,20 +49,21 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du } } +// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order. func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) { if len(responses) == 0 { + // Empty response for label_names, label_values and series API. return &ThanosLabelsResponse{ Status: queryrange.StatusSuccess, Data: []string{}, }, nil } - if len(responses) == 1 { - return responses[0], nil - } - switch responses[0].(type) { case *ThanosLabelsResponse: + if len(responses) == 1 { + return responses[0], nil + } set := make(map[string]struct{}) for _, res := range responses { @@ -83,25 +84,23 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange Data: lbls, }, nil case *ThanosSeriesResponse: - seriesData := make([]labelpb.ZLabelSet, 0) + if len(responses) == 1 { + return responses[0], nil + } + seriesData := make(labelpb.ZLabelSets, 0) - // seriesString is used in soring so we don't have to calculate the string of label sets again. - seriesString := make([]string, 0) uniqueSeries := make(map[string]struct{}) for _, res := range responses { for _, series := range res.(*ThanosSeriesResponse).Data { s := series.PromLabels().String() if _, ok := uniqueSeries[s]; !ok { seriesData = append(seriesData, series) - seriesString = append(seriesString, s) uniqueSeries[s] = struct{}{} } } } - sort.Slice(seriesData, func(i, j int) bool { - return seriesString[i] < seriesString[j] - }) + sort.Sort(seriesData) return &ThanosSeriesResponse{ Status: queryrange.StatusSuccess, Data: seriesData, diff --git a/pkg/queryfrontend/labels_codec_test.go b/pkg/queryfrontend/labels_codec_test.go index 5897a86536..a0e6980716 100644 --- a/pkg/queryfrontend/labels_codec_test.go +++ b/pkg/queryfrontend/labels_codec_test.go @@ -234,7 +234,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { }, { name: "thanos labels values request", - req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values"}, + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__"}, checkFunc: func(r *http.Request) bool { return r.URL.Query().Get(start) == startTime && r.URL.Query().Get(end) == endTime && @@ -243,7 +243,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { }, { name: "thanos labels values request, partial response set to true", - req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", PartialResponse: true}, + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__", PartialResponse: true}, checkFunc: func(r *http.Request) bool { return r.URL.Query().Get(start) == startTime && r.URL.Query().Get(end) == endTime && @@ -313,12 +313,29 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) { labelsData, err := json.Marshal(labelResponse) testutil.Ok(t, err) + labelResponseWithHeaders := &ThanosLabelsResponse{ + Status: "success", + Data: []string{"__name__"}, + Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}}, + } + labelsDataWithHeaders, err := json.Marshal(labelResponseWithHeaders) + testutil.Ok(t, err) + seriesResponse := &ThanosSeriesResponse{ Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}, } seriesData, err := json.Marshal(seriesResponse) testutil.Ok(t, err) + + seriesResponseWithHeaders := &ThanosSeriesResponse{ + Status: "success", + Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}, + Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}}, + } + seriesDataWithHeaders, err := json.Marshal(seriesResponseWithHeaders) + testutil.Ok(t, err) + for _, tc := range []struct { name string expectedError error @@ -344,12 +361,34 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) { res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsData))}, expectedResponse: labelResponse, }, + { + name: "thanos labels request with HTTP headers", + req: &ThanosLabelsRequest{}, + res: http.Response{ + StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsDataWithHeaders)), + Header: map[string][]string{ + cacheControlHeader: {noStoreValue}, + }, + }, + expectedResponse: labelResponseWithHeaders, + }, { name: "thanos series request", req: &ThanosSeriesRequest{}, res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesData))}, expectedResponse: seriesResponse, }, + { + name: "thanos series request with HTTP headers", + req: &ThanosSeriesRequest{}, + res: http.Response{ + StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesDataWithHeaders)), + Header: map[string][]string{ + cacheControlHeader: {noStoreValue}, + }, + }, + expectedResponse: seriesResponseWithHeaders, + }, } { t.Run(tc.name, func(t *testing.T) { // Default partial response value doesn't matter when encoding requests. @@ -364,3 +403,117 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) { }) } } + +func TestLabelsCodec_MergeResponse(t *testing.T) { + for _, tc := range []struct { + name string + expectedError error + responses []queryrange.Response + expectedResponse queryrange.Response + }{ + { + name: "Prometheus range query response format, not valid", + responses: []queryrange.Response{ + &queryrange.PrometheusResponse{Status: "success"}, + }, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format"), + }, + { + name: "Empty response", + responses: nil, + expectedResponse: &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}}, + }, + { + name: "One label response", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + }, + { + name: "One label response and two empty responses", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + }, + { + name: "Multiple duplicate label responses", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9092"}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9092", "localhost:9093"}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", + Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}}, + }, + // This case shouldn't happen because the responses from Querier are sorted. + { + name: "Multiple unordered label responses", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9093", "localhost:9092"}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9090"}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", + Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}}, + }, + { + name: "One series response", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + { + name: "One series response and two empty responses", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: queryrange.StatusSuccess}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + &ThanosSeriesResponse{Status: queryrange.StatusSuccess}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + { + name: "Multiple duplicate series responses", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + { + name: "Multiple unordered series responses", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{ + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}}, + }}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{ + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}}, + }}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{ + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}}, + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}}, + }}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Default partial response value doesn't matter when encoding requests. + codec := NewThanosLabelsCodec(false, time.Hour*2) + r, err := codec.MergeResponse(tc.responses...) + if tc.expectedError != nil { + testutil.Equals(t, err, tc.expectedError) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedResponse, r) + } + }) + } +} diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 5638f69e5f..36c5bfc139 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -295,3 +295,27 @@ func DeepCopy(lbls []ZLabel) []ZLabel { } return ret } + +// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted. +type ZLabelSets []ZLabelSet + +func (z ZLabelSets) Len() int { return len(z) } + +func (z ZLabelSets) Swap(i, j int) { z[i], z[j] = z[j], z[i] } + +func (z ZLabelSets) Less(i, j int) bool { + l := 0 + r := 0 + var result int + for l < z[i].Size() && r < z[j].Size() { + result = z[i].Labels[l].Compare(z[j].Labels[r]) + if result == 0 { + l++ + r++ + continue + } + return result < 0 + } + + return l == z[i].Size() +} diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 6656eea445..ab8543657b 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -5,6 +5,8 @@ package labelpb import ( "fmt" + "reflect" + "sort" "testing" "github.com/prometheus/prometheus/pkg/labels" @@ -104,3 +106,90 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) { } }) } + +func TestSortZLabelSets(t *testing.T) { + expectedResult := ZLabelSets{ + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "LabelNames", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_server_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "up", + "instance": "localhost:10908", + }), + ), + }, + } + + list := ZLabelSets{ + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "up", + "instance": "localhost:10908", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_server_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "LabelNames", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + } + + sort.Sort(list) + reflect.DeepEqual(expectedResult, list) +} diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 196365a36d..7ee97b4acf 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -5,6 +5,7 @@ package e2e_test import ( "context" + "reflect" "testing" "time" @@ -262,7 +263,7 @@ func TestQueryFrontend(t *testing.T) { t.Run("query frontend splitting works for labels values API", func(t *testing.T) { labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 + return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( e2e.Equals(1), @@ -281,7 +282,7 @@ func TestQueryFrontend(t *testing.T) { ) labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 + return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( e2e.Equals(3), @@ -309,7 +310,16 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { - return len(res) > 0 + if len(res) != 1 { + return false + } + + return reflect.DeepEqual(res[0], map[string]string{ + "__name__": "up", + "instance": "localhost:9090", + "job": "myself", + "prometheus": "test", + }) }, ) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -336,7 +346,16 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { - return len(res) > 0 + if len(res) != 1 { + return false + } + + return reflect.DeepEqual(res[0], map[string]string{ + "__name__": "up", + "instance": "localhost:9090", + "job": "myself", + "prometheus": "test", + }) }, ) testutil.Ok(t, q.WaitSumMetricsWithOptions( diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 90db116f03..1658ebcfb6 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -328,7 +328,7 @@ func TestQueryLabelValues(t *testing.T) { now := time.Now() labelValues(t, ctx, q.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 + return len(res) == 1 && res[0] == "localhost:9090" }) // Outside time range. @@ -428,7 +428,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, mustURLParse(t, "http://"+addr), start, end) if err != nil { return err @@ -437,7 +437,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 return nil } - return errors.Errorf("unexpected results size %d", len(res)) + return errors.Errorf("unexpected results %v", res) })) } @@ -447,7 +447,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, mustURLParse(t, "http://"+addr), label, start, end) if err != nil { return err @@ -456,7 +456,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e return nil } - return errors.Errorf("unexpected results size %d", len(res)) + return errors.Errorf("unexpected results %v", res) })) } @@ -465,7 +465,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.L logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end) if err != nil { return err @@ -474,7 +474,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.L return nil } - return errors.Errorf("unexpected results size %d", len(res)) + return errors.Errorf("unexpected results %v", res) })) }