Skip to content

Commit

Permalink
Align gRPC server status code to span status code (#3685)
Browse files Browse the repository at this point in the history
* implement based on new spec

* fix return value of serverStatus

* fix current tests

* add internal error test

* add changelog entry

* move function to bottom

* add ok test

* add stream server test

* refactor tests as table-driven tests, and add test for StreamServerInterceptor

* Update CHANGELOG.md

fix CR note on the changelog

Co-authored-by: Robert Pająk <[email protected]>

* refactor server tests to use shared assertion methods

* fix more CR

* add all gRPC status + remove name and grpcErr from vars

---------

Co-authored-by: Chester Cheung <[email protected]>
Co-authored-by: Robert Pająk <[email protected]>
Co-authored-by: Damien Mathieu <[email protected]>
Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
5 people authored Apr 17, 2023
1 parent 1870062 commit f998e3f
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- AWS SDK add `rpc.system` attribute in `go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws`. (#3582, #3617)
- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)

### Changed

- Update `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to align gRPC server span status with the changes in the OpenTelemetry specification. (#3685)

### Fixed

- Prevent taking from reservoir in AWS XRay Remote Sampler when there is zero capacity in `go.opentelemetry.io/contrib/samplers/aws/xray`. (#3684)
Expand Down
30 changes: 27 additions & 3 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
statusCode = s.Code()
span.SetStatus(codes.Error, s.Message())
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
messageSent.Event(ctx, 1, s.Proto())
} else {
Expand Down Expand Up @@ -435,7 +435,8 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
err := handler(srv, wrapServerStream(ctx, ss))
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
Expand Down Expand Up @@ -499,3 +500,26 @@ func peerFromCtx(ctx context.Context) string {
func statusCodeAttr(c grpc_codes.Code) attribute.KeyValue {
return GRPCStatusCodeKey.Int64(int64(c))
}

// serverStatus returns a span status code and message for a given gRPC
// status code. It maps specific gRPC status codes to a corresponding span
// status code and message. This function is intended for use on the server
// side of a gRPC connection.
//
// If the gRPC status code is Unknown, DeadlineExceeded, Unimplemented,
// Internal, Unavailable, or DataLoss, it returns a span status code of Error
// and the message from the gRPC status. Otherwise, it returns a span status
// code of Unset and an empty message.
func serverStatus(grpcStatus *status.Status) (codes.Code, string) {
switch grpcStatus.Code() {
case grpc_codes.Unknown,
grpc_codes.DeadlineExceeded,
grpc_codes.Unimplemented,
grpc_codes.Internal,
grpc_codes.Unavailable,
grpc_codes.DataLoss:
return codes.Error, grpcStatus.Message()
default:
return codes.Unset, ""
}
}
184 changes: 160 additions & 24 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,39 +583,175 @@ func TestStreamClientInterceptorWithError(t *testing.T) {
assert.Equal(t, codes.Error, span.Status().Code)
}

func TestServerInterceptorError(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
usi := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp))
deniedErr := status.Error(grpc_codes.PermissionDenied, "PERMISSION_DENIED_TEXT")
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, deniedErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{}, handler)
require.Error(t, err)
assert.Equal(t, err, deniedErr)
var serverChecks = []struct {
grpcCode grpc_codes.Code
wantSpanCode codes.Code
wantSpanStatusDescription string
}{
{
grpcCode: grpc_codes.OK,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.Canceled,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.Unknown,
wantSpanCode: codes.Error,
wantSpanStatusDescription: grpc_codes.Unknown.String(),
},
{
grpcCode: grpc_codes.InvalidArgument,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.DeadlineExceeded,
wantSpanCode: codes.Error,
wantSpanStatusDescription: grpc_codes.DeadlineExceeded.String(),
},
{
grpcCode: grpc_codes.NotFound,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.AlreadyExists,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.PermissionDenied,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.ResourceExhausted,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.FailedPrecondition,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.Aborted,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.OutOfRange,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
{
grpcCode: grpc_codes.Unimplemented,
wantSpanCode: codes.Error,
wantSpanStatusDescription: grpc_codes.Unimplemented.String(),
},
{
grpcCode: grpc_codes.Internal,
wantSpanCode: codes.Error,
wantSpanStatusDescription: grpc_codes.Internal.String(),
},
{
grpcCode: grpc_codes.Unavailable,
wantSpanCode: codes.Error,
wantSpanStatusDescription: grpc_codes.Unavailable.String(),
},
{
grpcCode: grpc_codes.DataLoss,
wantSpanCode: codes.Error,
wantSpanStatusDescription: grpc_codes.DataLoss.String(),
},
{
grpcCode: grpc_codes.Unauthenticated,
wantSpanCode: codes.Unset,
wantSpanStatusDescription: "",
},
}

span, ok := getSpanFromRecorder(sr, "")
if !ok {
t.Fatalf("failed to export error span")
}
assert.Equal(t, codes.Error, span.Status().Code)
assert.Contains(t, deniedErr.Error(), span.Status().Description)
func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescription string, wantGrpcCode grpc_codes.Code, span trace.ReadOnlySpan) {
// validate span status
assert.Equal(t, wantSpanCode, span.Status().Code)
assert.Equal(t, wantSpanStatusDescription, span.Status().Description)

// validate grpc code span attribute
var codeAttr attribute.KeyValue
for _, a := range span.Attributes() {
if a.Key == otelgrpc.GRPCStatusCodeKey {
codeAttr = a
break
}
}
if assert.True(t, codeAttr.Valid(), "attributes contain gRPC status code") {
assert.Equal(t, attribute.Int64Value(int64(grpc_codes.PermissionDenied)), codeAttr.Value)

require.True(t, codeAttr.Valid(), "attributes contain gRPC status code")
assert.Equal(t, attribute.Int64Value(int64(wantGrpcCode)), codeAttr.Value)
}

// TestUnaryServerInterceptor tests the server interceptor for unary RPCs.
func TestUnaryServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
usi := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp))
for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
// call the unary interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, grpcErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: name}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)

// validate events and their attributes
assert.Len(t, span.Events(), 2)
assert.ElementsMatch(t, []attribute.KeyValue{
attribute.Key("message.type").String("SENT"),
attribute.Key("message.id").Int(1),
}, span.Events()[1].Attributes)
})
}
}

type mockServerStream struct {
grpc.ServerStream
}

func (m *mockServerStream) Context() context.Context { return context.Background() }

// TestStreamServerInterceptor tests the server interceptor for streaming RPCs.
func TestStreamServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
usi := otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(tp))
for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
// call the stream interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ interface{}, _ grpc.ServerStream) error {
return grpcErr
}
err := usi(&grpc_testing.SimpleRequest{}, &mockServerStream{}, &grpc.StreamServerInfo{FullMethod: name}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)
})
}
assert.Len(t, span.Events(), 2)
assert.ElementsMatch(t, []attribute.KeyValue{
attribute.Key("message.type").String("SENT"),
attribute.Key("message.id").Int(1),
}, span.Events()[1].Attributes)
}

func TestParseFullMethod(t *testing.T) {
Expand Down

0 comments on commit f998e3f

Please sign in to comment.