Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point-to-point health check. Client side implementation #993

Merged
merged 23 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
Expand Down Expand Up @@ -249,6 +250,13 @@ func WithUserAgent(s string) DialOption {
}
}

// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = kp
}
}

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
Expand Down
18 changes: 18 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package keepalive

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this needs to be another package? Can't it ben inside the grpc package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separate package looks weird to me as well. We may want to figure out alternatives.


import (
"time"
)

// ClientParameters is used to set keepalive parameters on the client-side.
// These configure how the client will actively probe to notice when a connection broken
// and to cause activity so intermediaries are aware the connection is still in use.
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time time.Duration // The current default value is infinity.
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}
5 changes: 5 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ package transport

import (
"fmt"
"math"
"sync"
"time"

"golang.org/x/net/http2"
)
Expand All @@ -46,6 +48,9 @@ const (
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
)

Expand Down
91 changes: 91 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand All @@ -49,6 +50,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand Down Expand Up @@ -80,6 +82,8 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
awakenKeepalive chan struct{}

framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
Expand All @@ -99,6 +103,11 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters

statsHandler stats.Handler

mu sync.Mutex // guard the following variables
Expand Down Expand Up @@ -182,6 +191,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := opts.KeepaliveParams
// Validate keepalive parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is setting defaults, not validating.

if kp.Time == 0 {
kp.Time = defaultKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
Expand All @@ -198,6 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
Expand All @@ -211,8 +229,12 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
maxStreams: defaultMaxStreamsClient,
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
Expand Down Expand Up @@ -257,6 +279,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
}
go t.controller()
if t.kp.Time != infinity {
go t.keepalive()
}
t.writableChan <- 0
return t, nil
}
Expand Down Expand Up @@ -369,6 +394,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len(t.activeStreams) == 1 {
select {
case t.awakenKeepalive <- struct{}{}:
t.framer.writePing(false, false, [8]byte{})
default:
}
}

t.mu.Unlock()

Expand Down Expand Up @@ -980,6 +1014,7 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
Expand All @@ -990,6 +1025,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update activity after handleSettings?

for {
frame, err := t.framer.readFrame()
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
Expand Down Expand Up @@ -1102,6 +1138,61 @@ func (t *http2Client) controller() {
}
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this happen under lock? seems clearer to write:

t.mu.Lock()
dormant := len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream
t.mu.Unlock()
if dormant {
   ...
} else {
  t.controlBuf.put(p)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emptying of the awakenKeepalive channel needs to happen inside the lock so that some other thread creating a new stream sees the channel as writable if the number of streams go form 0 to 1.

t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.shutdownChan:
return
}
} else {
t.mu.Unlock()
// Send ping.
t.controlBuf.put(p)
}

// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there so much duplicated code here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily.
To reuse the outer select we'll need to have some additional book-keeping about if the ping was sent or not.

select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
t.Close()
return
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
}
}

func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
Expand Down
3 changes: 3 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
Expand Down Expand Up @@ -388,6 +389,8 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
Expand Down
Loading