Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point-to-point health check. Client side implementation #993

Merged
merged 23 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/transport"
)

Expand Down Expand Up @@ -230,6 +231,13 @@ func WithUserAgent(s string) DialOption {
}
}

// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(k keepalive.Params) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = k
}
}

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
Expand Down
50 changes: 50 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package keepalive

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this needs to be another package? Can't it ben inside the grpc package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separate package looks weird to me as well. We may want to figure out alternatives.


import (
"math"
"sync"
"time"
)

// Params is used to set keepalive parameters.
type Params struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about what happens when these are 0.

// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fot/for/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to wrap this.

Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space

PermitWithoutStream bool
}

// DefaultParams contains default values for keepalive parameters.
var DefaultParams = Params{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving to transport.

Time: time.Duration(math.MaxInt64), // default to infinite.
Timeout: time.Duration(20 * time.Second),
}

// mu is a mutex to protect Enabled variable.
var mu = sync.Mutex{}

// enable is a knob used to turn keepalive on or off.
var enable = false

// Enabled exposes the value of enable variable.
func Enabled() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary. I don't know a case where WithKeepaliveParams wouldn't work.

mu.Lock()
defer mu.Unlock()
return enable
}

// Enable can be called to enable keepalives.
func Enable() {
mu.Lock()
defer mu.Unlock()
enable = true
}

// Disable can be called to disable keepalive.
func Disable() {
mu.Lock()
defer mu.Unlock()
enable = false
}
43 changes: 43 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand All @@ -49,6 +50,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand Down Expand Up @@ -98,6 +100,11 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// Counter to keep track of activity(reading and writing on transport).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only be used for reading. Writing does not prevent the need for keepalive ping.

activity uint64 // accessed atomically.
// keepalive parameters.
kp keepalive.Params

mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
Expand Down Expand Up @@ -179,6 +186,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := keepalive.DefaultParams
if opts.KeepaliveParams != (keepalive.Params{}) {
kp = opts.KeepaliveParams
}
var buf bytes.Buffer
t := &http2Client{
target: addr.Addr,
Expand Down Expand Up @@ -206,6 +217,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
kp: kp,
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
Expand Down Expand Up @@ -690,6 +702,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
break
}
}
atomic.AddUint64(&t.activity, 1)
if !opts.Last {
return nil
}
Expand Down Expand Up @@ -976,6 +989,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update activity after handleSettings?

for {
frame, err := t.framer.readFrame()
atomic.AddUint64(&t.activity, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAS

if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
Expand Down Expand Up @@ -1052,6 +1066,15 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() {
timer := time.NewTimer(t.kp.Time)
if !keepalive.Enabled() {
// Prevent the timer from firing, ever.
if !timer.Stop() {
<-timer.C
}
}
isPingSent := false
keepalivePing := &ping{data: [8]byte{}}
for {
select {
case i := <-t.controlBuf.get():
Expand Down Expand Up @@ -1082,6 +1105,26 @@ func (t *http2Client) controller() {
case <-t.shutdownChan:
return
}
case <-timer.C:
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
// Get the activity counter value and reset it.
a := atomic.SwapUint64(&t.activity, 0)
if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to follow this part of the spec:

Since keepalive is not occurring on HTTP/2 connections without any streams, there will be a higher chance of failure for new RPCs following a long period of inactivity. To reduce the tail latency for these RPCs, it is important to not reset the keepalive time when a connection becomes active; if a new stream is created and there has been greater than keepalive time since the last read byte, then a keepalive PING should be sent (ideally before the HEADERS frame). Doing so detects the broken connection with a latency of keepalive timeout instead of keepalive time + timeout.

timer.Reset(t.kp.Time)
isPingSent = false
} else {
if !isPingSent {
// Send ping.
t.controlBuf.put(keepalivePing)
isPingSent = true
timer.Reset(t.kp.Timeout)
} else {
t.Close()
continue
}
}
case <-t.shutdownChan:
return
}
Expand Down
3 changes: 3 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/tap"
)
Expand Down Expand Up @@ -380,6 +381,8 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.Params
}

// TargetInfo contains the information of the target such as network address and metadata.
Expand Down
134 changes: 133 additions & 1 deletion transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
)

type server struct {
Expand Down Expand Up @@ -251,6 +252,10 @@ func (s *server) stop() {
}

func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
}

func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
server := &server{startedErr: make(chan error, 1)}
go server.start(t, port, maxStreams, ht)
server.wait(t, 2*time.Second)
Expand All @@ -262,13 +267,140 @@ func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, Client
target := TargetInfo{
Addr: addr,
}
ct, connErr = NewClientTransport(context.Background(), target, ConnectOptions{})
ct, connErr = NewClientTransport(context.Background(), target, copts)
if connErr != nil {
t.Fatalf("failed to create transport: %v", connErr)
}
return server, ct
}

func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
// Launch a non responsive server.
go func() {
defer lis.Close()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error at server-side while accepting: %v", err)
close(done)
return
}
done <- conn
}()
tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts)
if err != nil {
t.Fatalf("Failed to dial: %v", err)
}
return tr
}

func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
keepalive.Enable()
defer keepalive.Disable()
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
}}, done)
defer tr.Close()
conn, ok := <-done
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Sleep for keepalive to close the connection.
time.Sleep(4 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this should be a function of the settings above, rather than hardcoded.

there are 4 instances of this.

// Assert that the connection was closed.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test Failed: Expected client transport to have closed.")
}
}

func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
keepalive.Enable()
defer keepalive.Disable()
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: false, // Don't run keepalive even with no RPCs.
}}, done)
defer tr.Close()
conn, ok := <-done
if !ok {
t.Fatalf("server didn't reutrn connection object")
}
defer conn.Close()
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Assert that connections is still healthy.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected client transport to be healthy.")
}
}

func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
keepalive.Enable()
defer keepalive.Disable()
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: false, // Don't run keepalive even with no RPCs.
}}, done)
defer tr.Close()
conn, ok := <-done
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Create a stream.
_, err := tr.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create a new stream: %v", err)
}
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Assert that transport was closed.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test failed: Expected client transport to have closed.")
}
}

func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
keepalive.Enable()
defer keepalive.Disable()
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Don't run keepalive even with no RPCs.
}})
defer s.stop()
defer tr.Close()
// Give keep alive some time.
time.Sleep(4 * time.Second)
// Assert that transport is healthy.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected client transport to be healthy.")
}
}

func TestClientSendAndReceive(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
callHdr := &CallHdr{
Expand Down