-
Notifications
You must be signed in to change notification settings - Fork 532
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Correctly cancel GRPC context beneath the HTTP server (#3443)
* cancel context Signed-off-by: Joe Elliott <[email protected]> * update dskit Signed-off-by: Joe Elliott <[email protected]> * focused timeouts Signed-off-by: Joe Elliott <[email protected]> * docs Signed-off-by: Joe Elliott <[email protected]> * lint N docs Signed-off-by: Joe Elliott <[email protected]> * more lint Signed-off-by: Joe Elliott <[email protected]> * make update-mod Signed-off-by: Joe Elliott <[email protected]> --------- Signed-off-by: Joe Elliott <[email protected]>
- Loading branch information
1 parent
ae083c3
commit eb81b92
Showing
46 changed files
with
4,313 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package interceptor | ||
|
||
import ( | ||
"context" | ||
"strings" | ||
"time" | ||
|
||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
const streamingQuerierPrefix = "/tempopb.StreamingQuerier/" | ||
|
||
func NewFrontendAPIUnaryTimeout(timeout time.Duration) grpc.UnaryServerInterceptor { | ||
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { | ||
if strings.HasPrefix(info.FullMethod, streamingQuerierPrefix) { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, timeout) | ||
defer cancel() | ||
} | ||
|
||
return handler(ctx, req) | ||
} | ||
} | ||
|
||
func NewFrontendAPIStreamTimeout(timeout time.Duration) grpc.StreamServerInterceptor { | ||
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { | ||
ctx := ss.Context() | ||
if strings.HasPrefix(info.FullMethod, streamingQuerierPrefix) { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ss.Context(), timeout) | ||
defer cancel() | ||
} | ||
|
||
return handler(srv, &grpc_middleware.WrappedServerStream{ | ||
ServerStream: ss, | ||
WrappedContext: ctx, | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package interceptor | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"testing" | ||
"time" | ||
|
||
"github.com/grafana/tempo/pkg/gogocodec" | ||
"github.com/grafana/tempo/pkg/tempopb" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/encoding" | ||
) | ||
|
||
func TestInterceptorsCancelContextForStreaming(t *testing.T) { | ||
encoding.RegisterCodec(gogocodec.NewCodec()) | ||
|
||
interceptorTimeout := time.Second | ||
apiTimeout := time.Second * 5 | ||
|
||
unaryInt := NewFrontendAPIUnaryTimeout(interceptorTimeout) | ||
streamInt := NewFrontendAPIStreamTimeout(interceptorTimeout) | ||
|
||
serv := grpc.NewServer(grpc.UnaryInterceptor(unaryInt), grpc.StreamInterceptor(streamInt)) | ||
defer serv.GracefulStop() | ||
|
||
srv := &mockService{apiTimeout} | ||
tempopb.RegisterStreamingQuerierServer(serv, srv) | ||
tempopb.RegisterPusherServer(serv, srv) | ||
|
||
listener, err := net.Listen("tcp", "localhost:0") | ||
require.NoError(t, err) | ||
|
||
go func() { | ||
require.NoError(t, serv.Serve(listener)) | ||
}() | ||
|
||
conn, err := grpc.Dial(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(10e6), grpc.MaxCallSendMsgSize(10e6))) | ||
require.NoError(t, err) | ||
defer func() { | ||
require.NoError(t, conn.Close()) | ||
}() | ||
|
||
// test that a streaming client has its context cancelled after the interceptor timeout and before the api timeout | ||
c := tempopb.NewStreamingQuerierClient(conn) | ||
client, err := c.Search(context.Background(), &tempopb.SearchRequest{}) | ||
require.NoError(t, err) | ||
|
||
start := time.Now() | ||
_, err = client.Recv() | ||
require.EqualError(t, err, "rpc error: code = DeadlineExceeded desc = context deadline exceeded") | ||
require.LessOrEqual(t, time.Since(start), apiTimeout) // confirm that we didn't wait for the full api timeout | ||
|
||
// test that the pusher client does not have its context cancelled and waits for the full api timeout | ||
pc := tempopb.NewPusherClient(conn) | ||
|
||
start = time.Now() | ||
_, err = pc.PushBytesV2(context.Background(), &tempopb.PushBytesRequest{}) | ||
require.NoError(t, err) | ||
require.GreaterOrEqual(t, time.Since(start), apiTimeout) // confirm that we did wait for the full api timeout | ||
} | ||
|
||
type mockService struct { | ||
apiTimeout time.Duration | ||
} | ||
|
||
func (s *mockService) Search(_ *tempopb.SearchRequest, ss tempopb.StreamingQuerier_SearchServer) error { | ||
select { | ||
case <-time.After(s.apiTimeout): | ||
case <-ss.Context().Done(): | ||
return ss.Context().Err() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *mockService) PushBytes(ctx context.Context, _ *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { | ||
select { | ||
case <-time.After(s.apiTimeout): | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
|
||
return &tempopb.PushResponse{}, nil | ||
} | ||
|
||
func (s *mockService) PushBytesV2(ctx context.Context, _ *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { | ||
select { | ||
case <-time.After(s.apiTimeout): | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
|
||
return &tempopb.PushResponse{}, nil | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.