Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc, xds: recovery middleware to return and log error in case of panic #10895

Merged
merged 12 commits into from
Dec 7, 2021
Merged
3 changes: 3 additions & 0 deletions .changelog/10895.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
grpc, xds: improved reliability of grpc and xds servers by adding recovery-middleware to return and log error in case of panic.
```
2 changes: 1 addition & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.registerEnterpriseGRPCServices(deps, srv)
}

return agentgrpc.NewHandler(config.RPCAddr, register)
return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register)
}

func (s *Server) connectCARootsMonitor(ctx context.Context) {
Expand Down
51 changes: 46 additions & 5 deletions agent/grpc/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpc

import (
"bytes"
"context"
"fmt"
"net"
Expand All @@ -11,6 +12,8 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/resolver"
Expand Down Expand Up @@ -54,7 +57,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
res := resolver.NewServerResolverBuilder(newConfig(t))
registerWithGRPC(t, res)

srv := newTestServer(t, "server-1", "dc1")
srv := newSimpleTestServer(t, "server-1", "dc1")
tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
VerifyIncoming: true,
VerifyOutgoing: true,
Expand Down Expand Up @@ -91,7 +94,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {

for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
srv := newSimpleTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
}
Expand Down Expand Up @@ -123,7 +126,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
var servers []testServer
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
srv := newSimpleTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
Expand Down Expand Up @@ -166,7 +169,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {

for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
srv := newSimpleTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
}
Expand Down Expand Up @@ -215,7 +218,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {

for _, dc := range dcs {
name := "server-0-" + dc
srv := newTestServer(t, name, dc)
srv := newSimpleTestServer(t, name, dc)
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
}
Expand All @@ -240,3 +243,41 @@ func registerWithGRPC(t *testing.T, b *resolver.ServerResolverBuilder) {
resolver.Deregister(b.Authority())
})
}

func TestRecoverMiddleware(t *testing.T) {
gmichelo marked this conversation as resolved.
Show resolved Hide resolved
// Prepare a logger with output to a buffer
// so we can check what it writes.
var buf bytes.Buffer

logger := hclog.New(&hclog.LoggerOptions{
Output: &buf,
})

res := resolver.NewServerResolverBuilder(newConfig(t))
registerWithGRPC(t, res)

srv := newPanicTestServer(t, logger, "server-1", "dc1")
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)

pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue)

conn, err := pool.ClientConn("dc1")
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)

resp, err := client.Something(ctx, &testservice.Req{})
expectedErr := status.Errorf(codes.Internal, "grpc: panic serving request: panic from Something")
require.Equal(t, expectedErr, err)
require.Nil(t, resp)

// Read the log
strLog := buf.String()
// Checking the entire stack trace is not possible, let's
// make sure that it contains a couple of expected strings.
require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc.(*simplePanic).Something`)
}
37 changes: 35 additions & 2 deletions agent/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,42 @@ Package grpc provides a Handler and client for agent gRPC connections.
package grpc

import (
"context"
"fmt"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/hashicorp/go-hclog"
)

// NewHandler returns a gRPC server that accepts connections from Handle(conn).
// The register function will be called with the grpc.Server to register
// gRPC services with the server.
func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)) *Handler {
recoveryOpts := []recovery.Option{
recovery.WithRecoveryHandlerContext(newPanicHandler(logger)),
}
metrics := defaultMetrics()
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer(
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
(&activeStreamCounter{metrics: metrics}).Intercept,
),
grpc.StatsHandler(newStatsHandler(metrics)),
grpc.StreamInterceptor((&activeStreamCounter{metrics: metrics}).Intercept),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 15 * time.Second,
}),
Expand All @@ -32,6 +50,21 @@ func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
return &Handler{srv: srv, listener: lis}
}

// newPanicHandler returns a recovery.RecoveryHandlerFuncContext closure function
// to handle panic in GRPC server's handlers.
func newPanicHandler(logger Logger) recovery.RecoveryHandlerFuncContext {
return func(ctx context.Context, p interface{}) (err error) {
// Log the panic and the stack trace of the Goroutine that caused the panic.
stacktrace := hclog.Stacktrace()
logger.Error("panic serving grpc request",
"panic", p,
"stack", stacktrace,
)

return status.Errorf(codes.Internal, "grpc: panic serving request: %v", p)
gmichelo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Handler implements a handler for the rpc server listener, and the
// agent.Component interface for managing the lifecycle of the grpc.Server.
type Handler struct {
Expand Down
35 changes: 32 additions & 3 deletions agent/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)

type testServer struct {
Expand All @@ -37,11 +38,22 @@ func (s testServer) Metadata() *metadata.Server {
}
}

func newTestServer(t *testing.T, name string, dc string) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr, func(server *grpc.Server) {
func newSimpleTestServer(t *testing.T, name, dc string) testServer {
return newTestServer(t, hclog.Default(), name, dc, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
})
}

// newPanicTestServer sets up a simple server with handlers that panic.
func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string) testServer {
return newTestServer(t, logger, name, dc, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simplePanic{name: name, dc: dc})
})
}

func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, register func(server *grpc.Server)) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(logger, addr, register)

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
Expand Down Expand Up @@ -101,6 +113,23 @@ func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}

type simplePanic struct {
name, dc string
}

func (s *simplePanic) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
time.Sleep(time.Millisecond)
panic("panic from Flow")
}
return nil
}

func (s *simplePanic) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
time.Sleep(time.Millisecond)
panic("panic from Something")
}

// fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte.
// In the future we should be able to refactor Server and extract this RPC
// handling logic so that we don't need to use a fake.
Expand Down
3 changes: 2 additions & 1 deletion agent/grpc/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/go-hclog"
)

func noopRegister(*grpc.Server) {}
Expand All @@ -23,7 +24,7 @@ func TestHandler_EmitsStats(t *testing.T) {
sink, reset := patchGlobalMetrics(t)

addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr, noopRegister)
handler := NewHandler(hclog.Default(), addr, noopRegister)
reset()

testservice.RegisterSimpleServer(handler.srv, &simple{})
Expand Down
2 changes: 1 addition & 1 deletion agent/rpc/subscribe/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ var _ Backend = (*testBackend)(nil)
func runTestServer(t *testing.T, server *Server) net.Addr {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
var grpcServer *gogrpc.Server
handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) {
handler := grpc.NewHandler(hclog.New(nil), addr, func(srv *gogrpc.Server) {
grpcServer = srv
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
})
Expand Down
29 changes: 29 additions & 0 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -543,11 +545,38 @@ func tokenFromContext(ctx context.Context) string {
return ""
}

// newPanicHandler returns a recovery.RecoveryHandlerFuncContext closure function
// to handle panic in GRPC server's handlers.
func newPanicHandler(logger hclog.Logger) recovery.RecoveryHandlerFuncContext {
return func(ctx context.Context, p interface{}) (err error) {
// Log the panic and the stack trace of the Goroutine that caused the panic.
stacktrace := hclog.Stacktrace()
logger.Error("panic serving grpc request",
"panic", p,
"stack", stacktrace,
)

return status.Errorf(codes.Internal, "grpc: panic serving request: %v", p)
}
}
gmichelo marked this conversation as resolved.
Show resolved Hide resolved

// NewGRPCServer creates a grpc.Server, registers the Server, and then returns
// the grpc.Server.
func NewGRPCServer(s *Server, tlsConfigurator *tlsutil.Configurator) *grpc.Server {
recoveryOpts := []recovery.Option{
recovery.WithRecoveryHandlerContext(newPanicHandler(s.Logger)),
}

opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could also expose a function from agent/grpc that returns these two options, and use that here? The handler in agent/grpc could use the same PanicHandlerMiddleware function to add them to opts, that way its clear that both of these gRPC servers are using the same middleware.

opts :=  []grpc.ServerOption{grpc.MaxConcurrentStreams(2048)}
opts = append(opts, agentgrpc.PanicHandlerMiddleware(s.Logger)...)

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that agent/grpc/handler.go also requires another interceptor, which is not used in xds' server:

middleware.WithStreamServerChain(
	// Add middlware interceptors to recover in case of panics.
	recovery.StreamServerInterceptor(recoveryOpts...),
	(&activeStreamCounter{metrics: metrics}).Intercept,
),

And interceptions cannot be specified twice, otherwise you get the following panic:

panic: The stream server interceptor was already set and may not be reset. [recovered]
	panic: The stream server interceptor was already set and may not be reset.

Also, I think it might be good to have the middleware in close as possible where the server is defined, so that people can clearly see what interceptors are being used and they can easily extend the server with new interceptors.

}
if tlsConfigurator != nil {
if tlsConfigurator.Cert() != nil {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/google/gofuzz v1.2.0
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22
github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2
github.com/hashicorp/consul/api v1.8.0
github.com/hashicorp/consul/sdk v0.7.0
github.com/hashicorp/go-bexpr v0.1.2
Expand Down Expand Up @@ -72,7 +73,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/client_golang v1.4.0
github.com/rboyer/safeio v0.2.1
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ github.com/gophercloud/gophercloud v0.1.0 h1:P/nh25+rzXouhytV2pUHBb65fnds26Ghl8/
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looking into using a newer 1.x version, but it requires updating gRPC , which I don't particularly want to do right now. I looked at the diff, and nothing important has changed since 1.0, so I think this is good.

github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2 h1:1aeRCnE2CkKYqyzBu0+B2lgTcZPc3ea2lGpijeHbI1c=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2/go.mod h1:GhphxcdlaRyAuBSvo6rV71BvQcvB/vuX8ugCyybuS2k=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand Down Expand Up @@ -408,6 +411,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c h1:vwpFWvAO8DeIZfFeqASzZfsxuWPno9ncAebBEP0N3uE=
github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c/go.mod h1:otzZQXgoO96RTzDB/Hycg0qZcXZsWJGJRSXbmEIJ+4M=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand All @@ -423,6 +427,8 @@ github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down