Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reporting of HTTP error messages with binary body content #8363

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
Expand Down Expand Up @@ -46,13 +47,17 @@ const (
maxErrMsgLen = 1024
)

type OTLPHandlerLimits interface {
Copy link
Member Author

@pstibrany pstibrany Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface was extracted to simplify test setup, and is otherwise unrelated to the PR.

OTelMetricSuffixesEnabled(id string) bool
}

// OTLPHandler is an http.Handler accepting OTLP write requests.
func OTLPHandler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
enableOtelMetadataStorage bool,
limits *validation.Overrides,
limits OTLPHandlerLimits,
retryCfg RetryConfig,
push PushFunc,
pushMetrics *PushMetrics,
Expand Down Expand Up @@ -244,7 +249,7 @@ func otlpHandler(
if err := push(ctx, req); err != nil {
if errors.Is(err, context.Canceled) {
level.Warn(logger).Log("msg", "push request canceled", "err", err)
writeErrorToHTTPResponseBody(w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
writeErrorToHTTPResponseBody(r.Context(), w, statusClientClosedRequest, codes.Canceled, "push request context canceled", logger)
return
}
var (
Expand Down Expand Up @@ -272,7 +277,7 @@ func otlpHandler(
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, httpCode, retryCfg)
writeErrorToHTTPResponseBody(w, httpCode, grpcCode, errorMsg, logger)
writeErrorToHTTPResponseBody(r.Context(), w, httpCode, grpcCode, errorMsg, logger)
}
})
}
Expand Down Expand Up @@ -309,9 +314,12 @@ func httpRetryableToOTLPRetryable(httpStatusCode int) int {

// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
// See doc https://opentelemetry.io/docs/specs/otlp/#failures-1
func writeErrorToHTTPResponseBody(w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
func writeErrorToHTTPResponseBody(reqCtx context.Context, w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Content-Type-Options", "nosniff")
if server.IsHandledByHttpgrpcServer(reqCtx) {
w.Header().Set(server.ErrorMessageHeaderKey, msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body.
}
w.WriteHeader(httpCode)

respBytes, err := proto.Marshal(status.New(grpcCode, msg).Proto())
Expand Down
205 changes: 205 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,38 @@ package distributor
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/middleware"
dskit_server "github.com/grafana/dskit/server"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pmetric"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"

"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -872,3 +881,199 @@ func TestRetryConfig_Validate(t *testing.T) {
})
}
}

func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {
reg := prometheus.NewRegistry()
cfg := dskit_server.Config{}
// Set default values
cfg.RegisterFlags(flag.NewFlagSet("test", flag.ContinueOnError))

// Configure values for test.
cfg.HTTPListenAddress = "localhost"
cfg.HTTPListenPort = 0 // auto-assign
cfg.GRPCListenAddress = "localhost"
cfg.GRPCListenPort = 0 // auto-assign
cfg.Registerer = reg
cfg.Gatherer = reg
cfg.ReportHTTP4XXCodesInInstrumentationLabel = true // report 400 as errors.
cfg.GRPCMiddleware = []grpc.UnaryServerInterceptor{middleware.ServerUserHeaderInterceptor}
cfg.HTTPMiddleware = []middleware.Interface{middleware.AuthenticateUser}

srv, err := dskit_server.New(cfg)
require.NoError(t, err)
//srv.HTTP.

push := func(ctx context.Context, req *Request) error {
// Trigger conversion of incoming request to WriteRequest.
wr, err := req.WriteRequest()
if err != nil {
return err
}

if len(wr.Timeseries) > 0 && len(wr.Timeseries[0].Labels) > 0 && wr.Timeseries[0].Labels[0].Name == "__name__" && wr.Timeseries[0].Labels[0].Value == "report_server_error" {
return errors.New("some random push error")
}

return nil
}
h := OTLPHandler(200, util.NewBufferPool(), nil, false, otlpLimitsMock{}, RetryConfig{Enabled: false}, push, newPushMetrics(reg), reg, log.NewNopLogger(), true)
srv.HTTP.Handle("/otlp", h)

// start the server
require.NoError(t, err)
go func() { _ = srv.Run() }()
t.Cleanup(srv.Stop)

// create client
conn, err := grpc.NewClient(srv.GRPCListenAddr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(middleware.ClientUserHeaderInterceptor))
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })

type testCase struct {
request *httpgrpc.HTTPRequest
expectedResponse *httpgrpc.HTTPResponse
expectedGrpcErrorMessage string
}

testcases := map[string]testCase{
"missing content type returns 415": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Url: "/otlp",
Body: []byte("hello"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 415,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, 415, "unsupported content type: , supported: [application/json, application/x-protobuf]"),
},
expectedGrpcErrorMessage: "rpc error: code = Code(415) desc = unsupported content type: , supported: [application/json, application/x-protobuf]",
},

"invalid JSON request returns 400": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
Body: []byte("invalid"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 400,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, 400, "ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|..."),
},
expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|...",
},

"empty JSON is good request, with 200 status code": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
Body: []byte("{}"),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 200,
Headers: nil, // No headers expected for 200.
Body: nil, // No body expected for 200 code.
},
expectedGrpcErrorMessage: "", // No error expected
},

"trigger 5xx error by sending special metric": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
// This is simple OTLP request, with "report_server_error".
Body: []byte(`{"resourceMetrics": [{"scopeMetrics": [{"metrics": [{"name": "report_server_error", "gauge": {"dataPoints": [{"timeUnixNano": "1679912463340000000", "asDouble": 10.66}]}}]}]}]}`),
},
expectedResponse: &httpgrpc.HTTPResponse{Code: 503,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, codes.Internal, "some random push error"),
},
expectedGrpcErrorMessage: "rpc error: code = Code(503) desc = some random push error",
},
}

hc := httpgrpc.NewHTTPClient(conn)
httpClient := http.Client{}

for name, tc := range testcases {
t.Run(fmt.Sprintf("grpc: %s", name), func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")
resp, err := hc.Handle(ctx, tc.request)

if err != nil {
require.EqualError(t, err, tc.expectedGrpcErrorMessage)

errresp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "errors reported by OTLP handler should always be convertible to HTTP response")
resp = errresp
} else if tc.expectedGrpcErrorMessage != "" {
require.Failf(t, "expected error message %q, but got no error", tc.expectedGrpcErrorMessage)
}

// Before comparing response, we sort headers, to keep comparison stable.
sort.Slice(resp.Headers, func(i, j int) bool {
return resp.Headers[i].Key < resp.Headers[j].Key
})
require.Equal(t, tc.expectedResponse, resp)
})

t.Run(fmt.Sprintf("http: %s", name), func(t *testing.T) {
req, err := httpgrpc.ToHTTPRequest(context.Background(), tc.request)
require.NoError(t, err)

req.Header.Add("X-Scope-OrgID", "test")
req.RequestURI = ""
req.URL.Scheme = "http"
req.URL.Host = srv.HTTPListenAddr().String()

resp, err := httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
if len(body) == 0 {
body = nil // to simplify test
}

// Verify that body is the same as we expect through gRPC.
require.Equal(t, tc.expectedResponse.Body, body)

// Verify that expected headers are in the response.
for _, h := range tc.expectedResponse.Headers {
assert.Equal(t, h.Values, resp.Header.Values(h.Key))
}

// Verify that header that indicates grpc error for httpgrpc.Server is not in the response.
assert.Empty(t, resp.Header.Get(server.ErrorMessageHeaderKey))
})
}
}

func mustMarshalStatus(t *testing.T, code codes.Code, msg string) []byte {
bytes, err := proto.Marshal(status.New(code, msg).Proto())
require.NoError(t, err)
return bytes
}

type otlpLimitsMock struct{}

func (o otlpLimitsMock) OTelMetricSuffixesEnabled(id string) bool {
return false
}
7 changes: 6 additions & 1 deletion vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions vendor/github.com/grafana/dskit/httpgrpc/server/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading