Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: fix remote read error translation #7487

Merged
150 changes: 150 additions & 0 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb" // OTLP protos are not compatible with gogo
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/alertmanager"
Expand Down Expand Up @@ -248,6 +250,136 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur
return c.DoGetBody(addr)
}

// RemoteRead runs a remote read request against the response.
// RemoteRead returns the HTTP response with consumed body, the remote read protobuf response and an error.
// In case the response is not a protobuf, the plaintext body content is returned instead of the protobuf message.
func (c *Client) RemoteRead(metricName string, start, end time.Time) (_ *http.Response, _ *prompb.QueryResult, plaintextResponse []byte, _ error) {
req := &prompb.ReadRequest{
Queries: []*prompb.Query{{
Matchers: []*prompb.LabelMatcher{{Type: prompb.LabelMatcher_EQ, Name: labels.MetricName, Value: metricName}},
StartTimestampMs: start.UnixMilli(),
EndTimestampMs: end.UnixMilli(),
Hints: &prompb.ReadHints{
StepMs: 1,
StartMs: start.UnixMilli(),
EndMs: end.UnixMilli(),
},
}},
}
resp, err := c.doRemoteReadReq(req)
if err != nil {
return resp, nil, nil, fmt.Errorf("making remote read request: %w", err)
}
switch contentType := resp.Header.Get("Content-Type"); contentType {
case "application/x-protobuf":
if encoding := resp.Header.Get("Content-Encoding"); encoding != "snappy" {
return resp, nil, nil, fmt.Errorf("remote read should return snappy-encoded protobuf; got %s %s instead", encoding, contentType)
}
queryResult, err := parseRemoteReadSamples(resp)
if err != nil {
return resp, queryResult, nil, fmt.Errorf("parsing remote read response: %w", err)
}
return resp, queryResult, nil, nil
case "text/plain; charset=utf-8":
respBytes, err := io.ReadAll(resp.Body)
return resp, nil, respBytes, err
default:
return resp, nil, nil, fmt.Errorf("unexpected content type %s", contentType)
}
}

// RemoteReadChunks runs a remote read request against the response.
// RemoteReadChunks returns the HTTP response with consumed body, the remote read protobuf response and an error.
// In case the response is not a protobuf, the plaintext body content is returned instead of the protobuf message.
func (c *Client) RemoteReadChunks(metricName string, start, end time.Time) (_ *http.Response, _ []prompb.ChunkedReadResponse, plaintextResponse []byte, _ error) {
req := &prompb.ReadRequest{
Queries: []*prompb.Query{{
Matchers: []*prompb.LabelMatcher{{Type: prompb.LabelMatcher_EQ, Name: labels.MetricName, Value: metricName}},
StartTimestampMs: start.UnixMilli(),
EndTimestampMs: end.UnixMilli(),
Hints: &prompb.ReadHints{
StepMs: 1,
StartMs: start.UnixMilli(),
EndMs: end.UnixMilli(),
},
}},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}

resp, err := c.doRemoteReadReq(req)
if err != nil {
return resp, nil, nil, err
}
if err != nil {
return resp, nil, nil, fmt.Errorf("making remote read request: %w", err)
}
switch contentType := resp.Header.Get("Content-Type"); contentType {
case api.ContentTypeRemoteReadStreamedChunks:
if encoding := resp.Header.Get("Content-Encoding"); encoding != "" {
return resp, nil, nil, fmt.Errorf("remote read should not return Content-Encoding; got %s %s instead", encoding, contentType)
}
chunks, err := parseRemoteReadChunks(resp)
if err != nil {
return resp, chunks, nil, fmt.Errorf("parsing remote read response: %w", err)
}
return resp, chunks, nil, nil
case "text/plain; charset=utf-8":
respBytes, err := io.ReadAll(resp.Body)
return resp, nil, respBytes, err
default:
return resp, nil, nil, fmt.Errorf("unexpected content type %s", contentType)
}
}

func (c *Client) doRemoteReadReq(req *prompb.ReadRequest) (*http.Response, error) {
addr := fmt.Sprintf(
"http://%s/prometheus/api/v1/read",
c.querierAddress,
)

reqBytes, err := req.Marshal()
if err != nil {
return nil, fmt.Errorf("marshalling remote read request: %w", err)
}

return c.DoPost(addr, bytes.NewReader(snappy.Encode(nil, reqBytes)))
}

func parseRemoteReadSamples(resp *http.Response) (*prompb.QueryResult, error) {
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading remote read response: %w", err)
}
uncompressedBytes, err := snappy.Decode(nil, respBytes)
if err != nil {
return nil, fmt.Errorf("decompressing remote read response: %w", err)
}
response := &prompb.ReadResponse{}
err = response.Unmarshal(uncompressedBytes)
if err != nil {
return nil, fmt.Errorf("unmarshalling remote read response: %w", err)
}
return response.Results[0], nil
}

func parseRemoteReadChunks(resp *http.Response) ([]prompb.ChunkedReadResponse, error) {
stream := remote.NewChunkedReader(resp.Body, remote.DefaultChunkedReadLimit, nil)

var results []prompb.ChunkedReadResponse
for {
var res prompb.ChunkedReadResponse
err := stream.NextProto(&res)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("reading remote read response: %w", err)
}
results = append(results, res)
}
return results, nil
}

// QueryExemplars runs an exemplar query.
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down Expand Up @@ -1290,6 +1422,24 @@ func (c *Client) DoPost(url string, body io.Reader) (*http.Response, error) {
return c.doRequest("POST", url, body)
}

// DoPostBody performs a HTTP POST request towards the supplied URL and returns
// the full response body. The request contains the X-Scope-OrgID header and
// the timeout configured by the client object.
func (c *Client) DoPostBody(url string, body io.Reader) (*http.Response, []byte, error) {
resp, err := c.DoPost(url, body)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}

return resp, respBytes, nil
}

func (c *Client) doRequest(method, url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest(method, url, body)
if err != nil {
Expand Down
127 changes: 19 additions & 108 deletions integration/querier_remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,15 @@
package integration

import (
"bytes"
"context"
"errors"
"io"
"math/rand"
"net/http"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -79,68 +72,27 @@ func runTestPushSeriesForQuerierRemoteRead(t *testing.T, c *e2emimir.Client, que
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", seriesName)
require.NoError(t, err)

startMs := now.Add(-1*time.Minute).Unix() * 1000
endMs := now.Add(time.Minute).Unix() * 1000

q, err := remote.ToQuery(startMs, endMs, []*labels.Matcher{matcher}, &storage.SelectHints{
Step: 1,
Start: startMs,
End: endMs,
})
require.NoError(t, err)

req := &prompb.ReadRequest{
Queries: []*prompb.Query{q},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES},
}

data, err := proto.Marshal(req)
require.NoError(t, err)
compressed := snappy.Encode(nil, data)
startMs := now.Add(-1 * time.Minute)
endMs := now.Add(time.Minute)

// Call the remote read API endpoint with a timeout.
httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+querier.HTTPEndpoint()+"/prometheus/api/v1/read", bytes.NewReader(compressed))
require.NoError(t, err)
httpReq.Header.Set("X-Scope-OrgID", "user-1")
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Add("Accept-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", "Prometheus/1.8.2")
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

httpResp, err := http.DefaultClient.Do(httpReq)
client, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
httpResp, resp, _, err := client.RemoteRead(seriesName, startMs, endMs)
require.Equal(t, http.StatusOK, httpResp.StatusCode)

compressed, err = io.ReadAll(httpResp.Body)
require.NoError(t, err)

uncompressed, err := snappy.Decode(nil, compressed)
require.NoError(t, err)

var resp prompb.ReadResponse
err = proto.Unmarshal(uncompressed, &resp)
require.NoError(t, err)

// Validate the returned remote read data matches what was written
require.Len(t, resp.Results, 1)
require.Len(t, resp.Results[0].Timeseries, 1)
require.Len(t, resp.Results[0].Timeseries[0].Labels, 1)
require.Equal(t, seriesName, resp.Results[0].Timeseries[0].Labels[0].GetValue())
isSeriesFloat := len(resp.Results[0].Timeseries[0].Samples) == 1
isSeriesHistogram := len(resp.Results[0].Timeseries[0].Histograms) == 1
require.Len(t, resp.Timeseries, 1)
require.Len(t, resp.Timeseries[0].Labels, 1)
require.Equal(t, seriesName, resp.Timeseries[0].Labels[0].GetValue())
isSeriesFloat := len(resp.Timeseries[0].Samples) == 1
isSeriesHistogram := len(resp.Timeseries[0].Histograms) == 1
require.Equal(t, isSeriesFloat, !isSeriesHistogram)
if isSeriesFloat {
require.Equal(t, int64(expectedVectors[0].Timestamp), resp.Results[0].Timeseries[0].Samples[0].Timestamp)
require.Equal(t, float64(expectedVectors[0].Value), resp.Results[0].Timeseries[0].Samples[0].Value)
require.Equal(t, int64(expectedVectors[0].Timestamp), resp.Timeseries[0].Samples[0].Timestamp)
require.Equal(t, float64(expectedVectors[0].Value), resp.Timeseries[0].Samples[0].Value)
} else if isSeriesHistogram {
require.Equal(t, expectedVectors[0].Histogram, mimirpb.FromHistogramToPromHistogram(remote.HistogramProtoToHistogram(resp.Results[0].Timeseries[0].Histograms[0])))
require.Equal(t, expectedVectors[0].Histogram, mimirpb.FromHistogramToPromHistogram(remote.HistogramProtoToHistogram(resp.Timeseries[0].Histograms[0])))
}
}

Expand Down Expand Up @@ -228,16 +180,16 @@ func TestQuerierStreamingRemoteRead(t *testing.T) {
require.NoError(t, err)

// Generate the series
startMs := now.Add(-time.Minute).Unix() * 1000
endMs := now.Add(time.Minute).Unix() * 1000
startMs := now.Add(-time.Minute)
endMs := now.Add(time.Minute)

var samples []prompb.Sample
if tc.floats != nil {
samples = tc.floats(startMs, endMs)
samples = tc.floats(startMs.UnixMilli(), endMs.UnixMilli())
}
var histograms []prompb.Histogram
if tc.histograms != nil {
histograms = tc.histograms(startMs, endMs)
histograms = tc.histograms(startMs.UnixMilli(), endMs.UnixMilli())
}

var series []prompb.TimeSeries
Expand All @@ -253,52 +205,11 @@ func TestQuerierStreamingRemoteRead(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "series_1")
require.NoError(t, err)

q, err := remote.ToQuery(startMs, endMs, []*labels.Matcher{matcher}, &storage.SelectHints{
Step: 1,
Start: startMs,
End: endMs,
})
require.NoError(t, err)

req := &prompb.ReadRequest{
Queries: []*prompb.Query{q},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}

data, err := proto.Marshal(req)
require.NoError(t, err)
compressed := snappy.Encode(nil, data)

// Call the remote read API endpoint with a timeout.
httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+querier.HTTPEndpoint()+"/prometheus/api/v1/read", bytes.NewReader(compressed))
require.NoError(t, err)
httpReq.Header.Add("Accept-Encoding", "snappy")
httpReq.Header.Set("X-Scope-OrgID", "user-1")
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

httpResp, err := http.DefaultClient.Do(httpReq)
client, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
httpResp, results, _, err := client.RemoteReadChunks("series_1", startMs, endMs)
require.Equal(t, http.StatusOK, httpResp.StatusCode)

// Fetch streaming response
stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, nil)

var results []prompb.ChunkedReadResponse
for {
var res prompb.ChunkedReadResponse
err := stream.NextProto(&res)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
results = append(results, res)
}
require.NoError(t, err)

// Validate the returned remote read data
sampleIdx := 0
Expand Down
Loading
Loading