Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point-to-point health check. Client side implementation #993

Merged
merged 23 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
Expand Down Expand Up @@ -249,6 +250,13 @@ func WithUserAgent(s string) DialOption {
}
}

// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = kp
}
}

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
Expand Down
15 changes: 15 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package keepalive

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this needs to be another package? Can't it ben inside the grpc package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separate package looks weird to me as well. We may want to figure out alternatives.


import (
"time"
)

// ClientParameters is used to set keepalive parameters for the client side.
type ClientParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about what happens when these are 0.

// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fot/for/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to wrap this.

Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space

PermitWithoutStream bool
}
5 changes: 5 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
// The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.

type resetKeepaliveTimer struct{}

func (resetKeepaliveTimer) item() {}

type windowUpdate struct {
streamID uint32
increment uint32
Expand Down
91 changes: 91 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand All @@ -49,6 +50,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand Down Expand Up @@ -80,6 +82,8 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer.
awakenKeepalive chan int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this int instead of struct{}? the value is never used.


framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
Expand All @@ -99,6 +103,11 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// Counter to keep track of reading activity on transport.
activity uint64 // Accessed atomically.

kp keepalive.ClientParameters

statsHandler stats.Handler

mu sync.Mutex // guard the following variables
Expand Down Expand Up @@ -182,6 +191,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := opts.KeepaliveParams
// Validate keepalive parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is setting defaults, not validating.

if kp.Time == 0 {
kp.Time = defaultKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
Expand All @@ -198,6 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan int, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
Expand All @@ -210,8 +228,12 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
}
// make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t.awakenKeepalive <- 0
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
Expand Down Expand Up @@ -256,6 +278,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
}
go t.controller()
go t.keepalive()
t.writableChan <- 0
return t, nil
}
Expand Down Expand Up @@ -373,6 +396,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len(t.activeStreams) == 1 {
select {
case t.awakenKeepalive <- 0:
t.framer.writePing(true, false, [8]byte{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to flush the ping, we will flush the headers later.

default:
}
}

// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
Expand Down Expand Up @@ -982,6 +1014,7 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
atomic.AddUint64(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
Expand All @@ -992,6 +1025,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update activity after handleSettings?

for {
frame, err := t.framer.readFrame()
atomic.AddUint64(&t.activity, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAS

if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
Expand Down Expand Up @@ -1104,6 +1138,63 @@ func (t *http2Client) controller() {
}
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
if t.kp.Time == time.Duration(math.MaxInt64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should use infinity instead of repeating time.Duration(math.MaxInt64)

return
}
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer timer.Stop() below this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No all exit paths of this routine have a timer running. Trying to stop and empty timer.C of an already stopped timer leads to a deadlock.
Doing this in defer would mean making sure that all exit paths of the routine reset the timer(if one is not running already). Future updates to the code might miss that and get into trouble.

for {
select {
case <-timer.C:
if a := atomic.SwapUint64(&t.activity, 0); a > 0 {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this happen under lock? seems clearer to write:

t.mu.Lock()
dormant := len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream
t.mu.Unlock()
if dormant {
   ...
} else {
  t.controlBuf.put(p)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emptying of the awakenKeepalive channel needs to happen inside the lock so that some other thread creating a new stream sees the channel as writable if the number of streams go form 0 to 1.

t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.shutdownChan:
return
}
} else {
t.mu.Unlock()
// Send ping.
t.controlBuf.put(p)
}

// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there so much duplicated code here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily.
To reuse the outer select we'll need to have some additional book-keeping about if the ping was sent or not.

select {
case <-timer.C:
if a := atomic.SwapUint64(&t.activity, 0); a > 0 {
timer.Reset(t.kp.Time)
continue
}
t.Close()
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
}
}

func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
Expand Down
12 changes: 12 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ import (
"bytes"
"fmt"
"io"
"math"
"net"
"sync"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
Expand Down Expand Up @@ -382,6 +385,8 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
Expand Down Expand Up @@ -606,3 +611,10 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-
return i, nil
}
}

// keepalive related constants.
const (
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Duration is not necessary here.

)
Loading