Skip to content

Commit

Permalink
ensure order when merging multiple responses
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Nov 20, 2020
1 parent 9931004 commit dc4d2f2
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 24 deletions.
21 changes: 10 additions & 11 deletions pkg/queryfrontend/labels_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,21 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du
}
}

// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order.
func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
// Empty response for label_names, label_values and series API.
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: []string{},
}, nil
}

if len(responses) == 1 {
return responses[0], nil
}

switch responses[0].(type) {
case *ThanosLabelsResponse:
if len(responses) == 1 {
return responses[0], nil
}
set := make(map[string]struct{})

for _, res := range responses {
Expand All @@ -83,25 +84,23 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange
Data: lbls,
}, nil
case *ThanosSeriesResponse:
seriesData := make([]labelpb.ZLabelSet, 0)
if len(responses) == 1 {
return responses[0], nil
}
seriesData := make(labelpb.ZLabelSets, 0)

// seriesString is used in soring so we don't have to calculate the string of label sets again.
seriesString := make([]string, 0)
uniqueSeries := make(map[string]struct{})
for _, res := range responses {
for _, series := range res.(*ThanosSeriesResponse).Data {
s := series.PromLabels().String()
if _, ok := uniqueSeries[s]; !ok {
seriesData = append(seriesData, series)
seriesString = append(seriesString, s)
uniqueSeries[s] = struct{}{}
}
}
}

sort.Slice(seriesData, func(i, j int) bool {
return seriesString[i] < seriesString[j]
})
sort.Sort(seriesData)
return &ThanosSeriesResponse{
Status: queryrange.StatusSuccess,
Data: seriesData,
Expand Down
157 changes: 155 additions & 2 deletions pkg/queryfrontend/labels_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
},
{
name: "thanos labels values request",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values"},
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__"},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
Expand All @@ -243,7 +243,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) {
},
{
name: "thanos labels values request, partial response set to true",
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", PartialResponse: true},
req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__", PartialResponse: true},
checkFunc: func(r *http.Request) bool {
return r.URL.Query().Get(start) == startTime &&
r.URL.Query().Get(end) == endTime &&
Expand Down Expand Up @@ -313,12 +313,29 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
labelsData, err := json.Marshal(labelResponse)
testutil.Ok(t, err)

labelResponseWithHeaders := &ThanosLabelsResponse{
Status: "success",
Data: []string{"__name__"},
Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}},
}
labelsDataWithHeaders, err := json.Marshal(labelResponseWithHeaders)
testutil.Ok(t, err)

seriesResponse := &ThanosSeriesResponse{
Status: "success",
Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}},
}
seriesData, err := json.Marshal(seriesResponse)
testutil.Ok(t, err)

seriesResponseWithHeaders := &ThanosSeriesResponse{
Status: "success",
Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}},
Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}},
}
seriesDataWithHeaders, err := json.Marshal(seriesResponseWithHeaders)
testutil.Ok(t, err)

for _, tc := range []struct {
name string
expectedError error
Expand All @@ -344,12 +361,34 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsData))},
expectedResponse: labelResponse,
},
{
name: "thanos labels request with HTTP headers",
req: &ThanosLabelsRequest{},
res: http.Response{
StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsDataWithHeaders)),
Header: map[string][]string{
cacheControlHeader: {noStoreValue},
},
},
expectedResponse: labelResponseWithHeaders,
},
{
name: "thanos series request",
req: &ThanosSeriesRequest{},
res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesData))},
expectedResponse: seriesResponse,
},
{
name: "thanos series request with HTTP headers",
req: &ThanosSeriesRequest{},
res: http.Response{
StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesDataWithHeaders)),
Header: map[string][]string{
cacheControlHeader: {noStoreValue},
},
},
expectedResponse: seriesResponseWithHeaders,
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
Expand All @@ -364,3 +403,117 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) {
})
}
}

func TestLabelsCodec_MergeResponse(t *testing.T) {
for _, tc := range []struct {
name string
expectedError error
responses []queryrange.Response
expectedResponse queryrange.Response
}{
{
name: "Prometheus range query response format, not valid",
responses: []queryrange.Response{
&queryrange.PrometheusResponse{Status: "success"},
},
expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format"),
},
{
name: "Empty response",
responses: nil,
expectedResponse: &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
},
{
name: "One label response",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
{
name: "One label response and two empty responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
&ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
},
{
name: "Multiple duplicate label responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9092"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9092", "localhost:9093"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success",
Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}},
},
// This case shouldn't happen because the responses from Querier are sorted.
{
name: "Multiple unordered label responses",
responses: []queryrange.Response{
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9093", "localhost:9092"}},
&ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9090"}},
},
expectedResponse: &ThanosLabelsResponse{Status: "success",
Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}},
},
{
name: "One series response",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "One series response and two empty responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: queryrange.StatusSuccess},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: queryrange.StatusSuccess},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "Multiple duplicate series responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}},
},
{
name: "Multiple unordered series responses",
responses: []queryrange.Response{
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}},
}},
&ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}},
}},
},
expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}},
{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}},
{Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}},
}},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
codec := NewThanosLabelsCodec(false, time.Hour*2)
r, err := codec.MergeResponse(tc.responses...)
if tc.expectedError != nil {
testutil.Equals(t, err, tc.expectedError)
} else {
testutil.Ok(t, err)
testutil.Equals(t, tc.expectedResponse, r)
}
})
}
}
24 changes: 24 additions & 0 deletions pkg/store/labelpb/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,27 @@ func DeepCopy(lbls []ZLabel) []ZLabel {
}
return ret
}

// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted.
type ZLabelSets []ZLabelSet

func (z ZLabelSets) Len() int { return len(z) }

func (z ZLabelSets) Swap(i, j int) { z[i], z[j] = z[j], z[i] }

func (z ZLabelSets) Less(i, j int) bool {
l := 0
r := 0
var result int
for l < z[i].Size() && r < z[j].Size() {
result = z[i].Labels[l].Compare(z[j].Labels[r])
if result == 0 {
l++
r++
continue
}
return result < 0
}

return l == z[i].Size()
}
89 changes: 89 additions & 0 deletions pkg/store/labelpb/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package labelpb

import (
"fmt"
"reflect"
"sort"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -104,3 +106,90 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) {
}
})
}

func TestSortZLabelSets(t *testing.T) {
expectedResult := ZLabelSets{
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "grpc_client_handled_total",
"cluster": "test",
"grpc_code": "OK",
"grpc_method": "Info",
}),
),
},
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "grpc_client_handled_total",
"cluster": "test",
"grpc_code": "OK",
"grpc_method": "LabelNames",
}),
),
},
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "grpc_server_handled_total",
"cluster": "test",
"grpc_code": "OK",
"grpc_method": "Info",
}),
),
},
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "up",
"instance": "localhost:10908",
}),
),
},
}

list := ZLabelSets{
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "up",
"instance": "localhost:10908",
}),
),
},
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "grpc_server_handled_total",
"cluster": "test",
"grpc_code": "OK",
"grpc_method": "Info",
}),
),
},
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "grpc_client_handled_total",
"cluster": "test",
"grpc_code": "OK",
"grpc_method": "LabelNames",
}),
),
},
{
Labels: ZLabelsFromPromLabels(
labels.FromMap(map[string]string{
"__name__": "grpc_client_handled_total",
"cluster": "test",
"grpc_code": "OK",
"grpc_method": "Info",
}),
),
},
}

sort.Sort(list)
reflect.DeepEqual(expectedResult, list)
}
Loading

0 comments on commit dc4d2f2

Please sign in to comment.