Skip to content

Commit

Permalink
Add a ServeHTTP method to *grpc.Server
Browse files Browse the repository at this point in the history
This adds new http.Handler-based ServerTransport in the process,
reusing the HTTP/2 server code in x/net/http2 or Go 1.6+.

All end2end tests pass with this new ServerTransport.

Fixes grpc#75

Also:
Updates grpc#495 (lets user fix it with middleware in front)
Updates grpc#468 (x/net/http2 validates)
Updates grpc#147 (possible with x/net/http2)
Updates grpc#104 (x/net/http2 does this)
  • Loading branch information
bradfitz committed Feb 4, 2016
1 parent 66b94b9 commit e836862
Show file tree
Hide file tree
Showing 6 changed files with 579 additions and 36 deletions.
2 changes: 1 addition & 1 deletion rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
case compressionNone:
case compressionMade:
if recvCompress == "" {
return transport.StreamErrorf(codes.InvalidArgument, "grpc: received unexpected payload format %d", pf)
return transport.StreamErrorf(codes.InvalidArgument, "grpc: invalid grpc-encoding %q with compression enabled", recvCompress)
}
if dc == nil || recvCompress != dc.Type() {
return transport.StreamErrorf(codes.InvalidArgument, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
Expand Down
208 changes: 189 additions & 19 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ import (
"fmt"
"io"
"net"
"net/http"
"reflect"
"runtime"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -82,10 +84,12 @@ type service struct {

// Server is a gRPC server to serve RPC requests.
type Server struct {
opts options
mu sync.Mutex
opts options

mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
hconns map[net.Conn]bool // handler conns; used only for http.Handler-based transprot
m map[string]*service // service name -> service info
events trace.EventLog
}
Expand All @@ -96,6 +100,7 @@ type options struct {
cp Compressor
dc Decompressor
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
}

// A ServerOption sets options.
Expand Down Expand Up @@ -216,9 +221,17 @@ var (
ErrServerStopped = errors.New("grpc: the server has been stopped")
)

func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
creds, ok := s.opts.creds.(credentials.TransportAuthenticator)
if !ok {
return rawConn, nil, nil
}
return creds.ServerHandshake(rawConn)
}

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC request and then call the registered handlers to reply to them.
// read gRPC requests and then call the registered handlers to reply to them.
// Service returns when lis.Accept fails.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
Expand All @@ -235,39 +248,47 @@ func (s *Server) Serve(lis net.Listener) error {
delete(s.lis, lis)
s.mu.Unlock()
}()
if s.opts.useHandlerImpl {
return s.serveUsingHandler(lis)
}
for {
c, err := lis.Accept()
rawConn, err := lis.Accept()
if err != nil {
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
return err
}
var authInfo credentials.AuthInfo
if creds, ok := s.opts.creds.(credentials.TransportAuthenticator); ok {
var conn net.Conn
conn, authInfo, err = creds.ServerHandshake(c)
if err != nil {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Println("grpc: Server.Serve failed to complete security handshake.")
continue
}
c = conn
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
s.handleFailedConnAuthentication(rawConn, err)
continue
}
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
c.Close()
conn.Close()
return nil
}
s.mu.Unlock()

go s.serveNewHTTP2Transport(c, authInfo)
go s.serveNewHTTP2Transport(conn, authInfo)
}
}

func (s *Server) handleFailedConnAuthentication(rawConn net.Conn, err error) {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Println("grpc: Server.Serve failed to complete security handshake.")
rawConn.Close()
}

// serveNewHTTP2Transport sets up a new http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go) and
// serves streams on it.
// This is run in its own goroutine (it does network I/O in
// transport.NewServerTransport).
func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
if err != nil {
Expand Down Expand Up @@ -299,6 +320,142 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
wg.Wait()
}

var _ http.Handler = (*Server)(nil)

// serveUsingHandler is the implementation of Serve(net.Listener) when
// TestingUseHandlerImpl has been configured. This lets the end2end
// tests exercise the ServeHTTP method as one of the environment types.
func (s *Server) serveUsingHandler(lis net.Listener) error {
hs := &http.Server{
Handler: s,
ConnState: s.onConnStateChange,
}
if err := http2.ConfigureServer(hs, &http2.Server{
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
}); err != nil {
return err
}
hlis := &handlerListener{
s: s,
Listener: lis,
acceptc: make(chan interface{}, 1),
closedc: make(chan struct{}),
}
go hlis.acceptLoop()
return hs.Serve(hlis)
}

// onConnStateChange is the net/http.Server.ConnState state change
// hook used by the http.Handler-based transport, to track which
// inbound TCP or TLS connections are live. Note that these are not
// ServerTransports. Each received HTTP request (each ServeHTTP call)
// is a ServerTransport for exactly 1 stream. This on the other hand
// tracks the underlying connections.
func (s *Server) onConnStateChange(c net.Conn, state http.ConnState) {
if state != http.StateNew && state != http.StateClosed {
// Ignore transitions between idle and active.
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.hconns == nil {
s.hconns = make(map[net.Conn]bool)
}
if state == http.StateNew {
s.hconns[c] = true
} else {
delete(s.hconns, c)
}
}

// handlerListener is the net.Listener used by Server.serveUsingHandler.
//
// It takes care to do blocking network operations off the goroutine
// doing the Accept loop.
//
// To use its, acceptLoop must be running in its own goroutine
// for Accept to work.
type handlerListener struct {
s *Server
net.Listener // embedded to get the Addr method
acceptc chan interface{} // chan of net.Conn or error

closedc chan struct{} // closed on close
closeOnce sync.Once
closeErr error
}

func (hl *handlerListener) Close() error {
hl.closeOnce.Do(hl.closeOnceFunc)
return hl.closeErr
}

func (hl *handlerListener) closeOnceFunc() {
hl.closeErr = hl.Listener.Close()
close(hl.closedc)
}

func (hl *handlerListener) Accept() (net.Conn, error) {
select {
case v := <-hl.acceptc:
if c, ok := v.(net.Conn); ok {
return c, nil
}
return nil, v.(error)
case <-hl.closedc:
return nil, errors.New("listener closed")
}
}

// acceptLoop runs in its own goroutine and accepts conns and sets up
// TLS, feeding successful connections to Accept.
func (hl *handlerListener) acceptLoop() {
for {
rawConn, err := hl.Listener.Accept()
if err != nil {
select {
case hl.acceptc <- err:
case <-hl.closedc:
return
}
continue
}
go hl.authenticateConn(rawConn)
}
}

// authenticateConn runs in a goroutine separate from the handlerListener's accept loop
// and sets up TLS (or whatever the TransportAuthenticator does) and sends successfully upgraded
// Conns along the channel for Accept to return.
func (hl *handlerListener) authenticateConn(rawConn net.Conn) {
// Discarding authInfo because it's just the *tls.Conn's
// ConnectionState, which we can recover later in ServeHTTP.
conn, _, err := hl.s.useTransportAuthenticator(rawConn)
if err != nil {
hl.s.handleFailedConnAuthentication(rawConn, err)
return
}
select {
case hl.acceptc <- conn:
case <-hl.closedc:
conn.Close()
}
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
st, err := transport.NewServerHandlerTransport(w, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(st) {
st.Close()
return
}
defer s.removeConn(st)
s.serveStreams(st)
}

// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
Expand Down Expand Up @@ -602,13 +759,18 @@ func (s *Server) Stop() {
s.lis = nil
cs := s.conns
s.conns = nil
for c := range s.hconns {
go c.Close()
}
s.mu.Unlock()

for lis := range listeners {
lis.Close()
}
for c := range cs {
c.Close()
}

s.mu.Lock()
if s.events != nil {
s.events.Finish()
Expand All @@ -618,7 +780,8 @@ func (s *Server) Stop() {
}

// TestingCloseConns closes all exiting transports but keeps s.lis accepting new
// connections. This is for test only now.
// connections.
// This is only for tests and is subject to removal.
func (s *Server) TestingCloseConns() {
s.mu.Lock()
for c := range s.conns {
Expand All @@ -628,6 +791,13 @@ func (s *Server) TestingCloseConns() {
s.mu.Unlock()
}

// TestingUseHandlerImpl enables the http.Handler-based server implementation.
// It must be called before Serve and requires TLS credentials.
// This is only for tests and is subject to removal.
func (s *Server) TestingUseHandlerImpl() {
s.opts.useHandlerImpl = true
}

// SendHeader sends header metadata. It may be called at most once from a unary
// RPC handler. The ctx is the RPC handler's Context or one derived from it.
func SendHeader(ctx context.Context, md metadata.MD) error {
Expand Down
Loading

0 comments on commit e836862

Please sign in to comment.