Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point-to-point health check. Client side implementation #993

Merged
merged 23 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ func WithUserAgent(s string) DialOption {
}
}

// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(k KeepaliveParameters) DialOption {
kp := &transport.KeepaliveParameters{
Time: k.Time,
Timeout: k.Timeout,
PermitWithoutStream: k.PermitWithoutStream,
}
return func(o *dialOptions) {
o.copts.KeepaliveParams = kp
}
}

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
Expand Down
15 changes: 15 additions & 0 deletions keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package grpc

import (
"time"
)

// KeepaliveParameters is used to set keepalive parameters.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about explaining a bit more.

KeepaliveParameters configures how the ClientConn will actively probe to notice when a connection is broken and to cause activity so intermediaries are aware the connection is still in use.

type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a duration of this time after having receiving nothing. (I'm pointing out the issue; my text still needs rewording)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there should be some mention of the current defaults. Here and below. Try to use language that implies the defaults may change, like "the current default is" and similar.

Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fot/for/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/keepalive_timeout/Timeout/ ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this documentation is saying what you hope. It actually says it will always close the connection after a keepalive check. You need something saying that it only matters when nothing is received by the timeout.

Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}
5 changes: 5 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
// The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.

type resetKeepaliveTimer struct{}

func (resetKeepaliveTimer) item() {}

type windowUpdate struct {
streamID uint32
increment uint32
Expand Down
146 changes: 122 additions & 24 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -99,6 +100,14 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// Counter to keep track of reading activity on transport.
activity uint64 // accessed atomically.
// Flag to keep track if the keepalive check was skipped because there
// were no active streams and PermitWithoutStream was false
// keepaliveSkipped = 1 means skipped
keepaliveSkipped uint32 // accessed atomically
// keepalive parameters.
kp *KeepaliveParameters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe get rid of the pointer? It is probably better to inline it from a locality perspective, but it would also mean validate() wouldn't need as much thought. I think it is safe today, but you are mutating a structure that is used from multiple threads but without any synchronization; it would be trivial to accidentally break that in the future.

Note this also would make defaultKeepaliveParams unnecessary.

statsHandler stats.Handler

mu sync.Mutex // guard the following variables
Expand Down Expand Up @@ -182,6 +191,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := defaultKeepaliveParams
if opts.KeepaliveParams != nil {
kp = opts.KeepaliveParams
kp.validate()
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
Expand Down Expand Up @@ -210,6 +224,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
}
if t.statsHandler != nil {
Expand Down Expand Up @@ -373,6 +388,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// if the number of active streams are now equal to 1, then check if keepalive
// was being skipped. If so, fire the keepalive timer
if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 {
t.framer.writePing(true, false, [8]byte{})
t.controlBuf.put(resetKeepaliveTimer{})
}

// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
Expand Down Expand Up @@ -982,6 +1003,7 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
atomic.AddUint64(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
Expand All @@ -992,6 +1014,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update activity after handleSettings?

for {
frame, err := t.framer.readFrame()
atomic.AddUint64(&t.activity, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAS

if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
Expand Down Expand Up @@ -1068,37 +1091,112 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() {
timer := time.NewTimer(t.kp.Time)
if t.kp.Time == infinity {
// Prevent the timer from firing, ever.
if !timer.Stop() {
<-timer.C
}
}
isPingSent := false
keepalivePing := &ping{data: [8]byte{}}
// select toggles between control channel and writable chanel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/chanel/channel/

// We need to wait on writable channel only after having recieved
// a control message that requires controller to take an action.
// However, while waiting on either of these channels, the keepalive
// timer channel or shutdown channel might trigger. Such toggling
// take care of this case.
cchan := t.controlBuf.get()
var wchan chan int
var controlMsg item
for {
select {
case i := <-t.controlBuf.get():
case controlMsg = <-cchan:
t.controlBuf.load()
select {
case <-t.writableChan:
switch i := i.(type) {
case *windowUpdate:
t.framer.writeWindowUpdate(true, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
t.applySettings(i.ss)
} else {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
case *ping:
t.framer.writePing(true, i.ack, i.data)
default:
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
// If controlMsg is of type resetKeepaliveTimer,
// then check if the keepaliveSkipped flag is still set.
if _, ok := controlMsg.(resetKeepaliveTimer); ok {
atomic.StoreUint32(&t.keepaliveSkipped, 0)
// Reset the timer to timeout.
// Note : This is safe to read, since the
// only codepath that sets the keepaliveSkipped
// flag also resets the timer to infinity.
// Thus, there'll never be a case where we are
// trying to read from an empty timer channel.
isPingSent = true
if !timer.Stop() {
<-timer.C
}
t.writableChan <- 0
timer.Reset(t.kp.Timeout)
continue
}
wchan = t.writableChan
cchan = nil
case <-wchan:
switch i := controlMsg.(type) {
case *windowUpdate:
t.framer.writeWindowUpdate(true, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
t.applySettings(i.ss)
} else {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
case *ping:
t.framer.writePing(true, i.ack, i.data)
default:
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
}
wchan <- 0
wchan = nil
cchan = t.controlBuf.get()
case <-timer.C:
// All code paths in this case must reset the timer.

// Get the activity counter value and reset it.
a := atomic.SwapUint64(&t.activity, 0)
if a > 0 {
atomic.StoreUint32(&t.keepaliveSkipped, 0)
isPingSent = false
timer.Reset(t.kp.Time)
continue
case <-t.shutdownChan:
return
}
if isPingSent {
t.Close()
timer.Reset(infinity)
continue
}
t.mu.Lock()
ns := len(t.activeStreams)
if !t.kp.PermitWithoutStream && ns < 1 {
// set flag that signifyies keepalive was skipped
atomic.StoreUint32(&t.keepaliveSkipped, 1)
t.mu.Unlock()
timer.Reset(infinity)
continue
}
t.mu.Unlock()
// reset the keepaliveSkipped flag
atomic.StoreUint32(&t.keepaliveSkipped, 0)
// Send ping.
t.controlBuf.put(keepalivePing)
isPingSent = true
timer.Reset(t.kp.Timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there so much duplicated code here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily.
To reuse the outer select we'll need to have some additional book-keeping about if the ping was sent or not.

case <-t.shutdownChan:
// stop the keepalive timer
if !timer.Stop() {
select {
case <-timer.C:
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So note this prevents this goroutine from blocking permanently, but it may not free the timer. The Stop() may not process immediately, in which case <-timer.C doesn't have a value yet, but it will in the future.

It's probably better to either get rid of the if t.kp.Time == infinity { optimization above or check a second time here (you could have a variable, say isTimerUsed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we exit the controller function right after, wouldn't the timer be eventually gc'd. Also, since nowhere outside the function is timer.C accessed, it should be safe even if timer.C later gets a value. In fact, maybe we can get rid of the case <- timer.c: altogether?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're forgetting there is a sender of that value. The sender could keep the timer alive and prevent GC.

Now, I happen to know that the channel is buffered, and so in this specific case the sender will be able to queue an entry and then remove the timer from its data structures and then the timer could be GCed.

// In case we stopped the timer before the for loop began.
// This happens when keepalive time provided was infinity.
}
}
return
}
}
Expand Down
34 changes: 34 additions & 0 deletions transport/keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package transport

import (
"math"
"time"
)

// KeepaliveParameters is used to set keepalive parameters.
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}

// Validate is used to validate the keepalive parameters.
// Time durations initialized to 0 will be replaced with default Values.
func (p *KeepaliveParameters) validate() {
if p.Time == 0 {
p.Time = infinity
}
if p.Timeout == 0 {
p.Timeout = twentyScnd
}
}

const (
// Infinity is the default value of keepalive time.
infinity = time.Duration(math.MaxInt64)
// TwentyScnd is the default value of timeout.
twentyScnd = time.Duration(20 * time.Second)
)
8 changes: 8 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,18 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams *KeepaliveParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}

// default values for keepalive parameters.
var defaultKeepaliveParams = &KeepaliveParameters{
Time: infinity, // default to infinite.
Timeout: twentyScnd,
}

// TargetInfo contains the information of the target such as network address and metadata.
type TargetInfo struct {
Addr string
Expand Down
Loading