Skip to content

Commit

Permalink
Propagate request ID through gRPC context (thanos-io#7356)
Browse files Browse the repository at this point in the history
* Propagate request ID through gRPC context

The request ID only gets propagated through HTTP calls and is not available
in gRPC servers.

This commit adds intereceptors to grpc servers and clients to make sure request ID
propagation happens.

Signed-off-by: Filip Petkovski <[email protected]>

* Add license

Signed-off-by: Filip Petkovski <[email protected]>

---------

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored and hczhu-db committed Aug 22, 2024
1 parent fc1ee71 commit df06df7
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -58,12 +59,14 @@ func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer op
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcserver.NewUnaryClientRequestIDInterceptor(),
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcserver.NewStreamClientRequestIDInterceptor(),
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
NewUnaryServerRequestIDInterceptor(),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
met.UnaryServerInterceptor(),
tags.UnaryServerInterceptor(tagsOpts...),
tracing.UnaryServerInterceptor(tracer),
grpc_logging.UnaryServerInterceptor(kit.InterceptorLogger(logger), logOpts...),
),
grpc_middleware.WithStreamServerChain(
NewStreamServerRequestIDInterceptor(),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
met.StreamServerInterceptor(),
tags.StreamServerInterceptor(tagsOpts...),
Expand Down
67 changes: 67 additions & 0 deletions pkg/server/grpc/request_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package grpc

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/thanos-io/thanos/pkg/server/http/middleware"
)

const requestIDKey = "request-id"

func NewUnaryClientRequestIDInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
ctx = metadata.AppendToOutgoingContext(ctx, requestIDKey, reqID)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func NewUnaryServerRequestIDInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if vals := metadata.ValueFromIncomingContext(ctx, requestIDKey); len(vals) == 1 {
ctx = middleware.NewContextWithRequestID(ctx, vals[0])
}
return handler(ctx, req)
}
}

func NewStreamClientRequestIDInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
ctx = metadata.AppendToOutgoingContext(ctx, requestIDKey, reqID)
}
return streamer(ctx, desc, cc, method, opts...)
}
}

func NewStreamServerRequestIDInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if vals := metadata.ValueFromIncomingContext(ss.Context(), requestIDKey); len(vals) == 1 {
ctx := middleware.NewContextWithRequestID(ss.Context(), vals[0])
return handler(srv, newStreamWithContext(ctx, ss))
}
return handler(srv, ss)
}
}

type streamWithContext struct {
grpc.ServerStream
ctx context.Context
}

func newStreamWithContext(ctx context.Context, serverStream grpc.ServerStream) *streamWithContext {
return &streamWithContext{ServerStream: serverStream, ctx: ctx}
}

func (s streamWithContext) Context() context.Context {
return s.ctx
}
6 changes: 3 additions & 3 deletions pkg/server/http/middleware/request_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type ctxKey int

const reqIDKey = ctxKey(0)

// newContextWithRequestID creates a context with a request id.
func newContextWithRequestID(ctx context.Context, rid string) context.Context {
// NewContextWithRequestID creates a context with a request id.
func NewContextWithRequestID(ctx context.Context, rid string) context.Context {
return context.WithValue(ctx, reqIDKey, rid)
}

Expand All @@ -36,7 +36,7 @@ func RequestID(h http.Handler) http.HandlerFunc {
reqID = ulid.MustNew(ulid.Timestamp(time.Now()), entropy).String()
r.Header.Set("X-Request-ID", reqID)
}
ctx := newContextWithRequestID(r.Context(), reqID)
ctx := NewContextWithRequestID(r.Context(), reqID)
h.ServeHTTP(w, r.WithContext(ctx))
}
}

0 comments on commit df06df7

Please sign in to comment.