Skip to content

Commit

Permalink
does anything work
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Mar 7, 2024
1 parent 79629ae commit d251d7f
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions cmd/tempo/app/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/services"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
Expand Down Expand Up @@ -106,6 +107,11 @@ func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP
cfg.Router = nil
cfg.DoNotAddDefaultHTTPMiddleware = true // we don't want instrumentation on the "root" router, we want it on our mux. it will be added below.

if actualWriteTimeout > 0 {
cfg.GRPCMiddleware = append(cfg.GRPCMiddleware, unaryTimeoutInterceptor(actualWriteTimeout))
cfg.GRPCStreamMiddleware = append(cfg.GRPCStreamMiddleware, streamTimeoutInterceptor(actualWriteTimeout))
}

s.externalServer, err = server.NewWithMetrics(cfg, metrics)
if err != nil {
return nil, fmt.Errorf("failed to create server: %w", err)
Expand All @@ -132,21 +138,6 @@ func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP
// route to GRPC server if it's a GRPC request
if req.ProtoMajor == 2 && strings.Contains(req.Header.Get("Content-Type"), "application/grpc") {
// http is handled by the http.TimeoutHandler. manually write a timeout for grpc
if actualWriteTimeout > 0 {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()

req = req.WithContext(ctx)

go func() {
select {
case <-req.Context().Done():
case <-time.After(actualWriteTimeout):
cancel()
}
}()
}

s.externalServer.GRPC.ServeHTTP(w, req)

return
Expand Down Expand Up @@ -217,3 +208,24 @@ func (dh ignoreSignalHandler) Loop() {
func (dh ignoreSignalHandler) Stop() {
close(dh)
}

func unaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

return handler(ctx, req)
}
}

func streamTimeoutInterceptor(timeout time.Duration) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx, cancel := context.WithTimeout(ss.Context(), timeout)
defer cancel()

return handler(srv, &grpc_middleware.WrappedServerStream{
ServerStream: ss,
WrappedContext: ctx,
})
}
}

0 comments on commit d251d7f

Please sign in to comment.