-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point-to-point health check. Client side implementation #993
Changes from 15 commits
e58450b
a417b82
93111e4
058907d
45907c2
97fb58a
901cdf6
eeb6f5b
49d2a88
b2448f6
e78a1f3
5fcb58f
0ba0eaf
c8983ba
0bdf059
336b4ea
bc3cca9
3ad7810
e328ede
f220619
25f14b7
9d23576
661dbbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package grpc | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
// KeepaliveParameters is used to set keepalive parameters. | ||
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After a duration of this time after having receiving nothing. (I'm pointing out the issue; my text still needs rewording) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems there should be some mention of the current defaults. Here and below. Try to use language that implies the defaults may change, like "the current default is" and similar. |
||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/fot/for/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/keepalive_timeout/Timeout/ ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this documentation is saying what you hope. It actually says it will always close the connection after a keepalive check. You need something saying that it only matters when nothing is received by the timeout. |
||
Timeout time.Duration | ||
//If true, client runs keepalive checks even with no active RPCs. | ||
PermitWithoutStream bool | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ import ( | |
"net" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
@@ -99,6 +100,14 @@ type http2Client struct { | |
|
||
creds []credentials.PerRPCCredentials | ||
|
||
// Counter to keep track of reading activity on transport. | ||
activity uint64 // accessed atomically. | ||
// Flag to keep track if the keepalive check was skipped because there | ||
// were no active streams and PermitWithoutStream was false | ||
// keepaliveSkipped = 1 means skipped | ||
keepaliveSkipped uint32 // accessed atomically | ||
// keepalive parameters. | ||
kp *KeepaliveParameters | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe get rid of the pointer? It is probably better to inline it from a locality perspective, but it would also mean Note this also would make |
||
statsHandler stats.Handler | ||
|
||
mu sync.Mutex // guard the following variables | ||
|
@@ -182,6 +191,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
if opts.UserAgent != "" { | ||
ua = opts.UserAgent + " " + ua | ||
} | ||
kp := defaultKeepaliveParams | ||
if opts.KeepaliveParams != nil { | ||
kp = opts.KeepaliveParams | ||
kp.validate() | ||
} | ||
var buf bytes.Buffer | ||
t := &http2Client{ | ||
ctx: ctx, | ||
|
@@ -210,6 +224,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
creds: opts.PerRPCCredentials, | ||
maxStreams: math.MaxInt32, | ||
streamSendQuota: defaultWindowSize, | ||
kp: kp, | ||
statsHandler: opts.StatsHandler, | ||
} | ||
if t.statsHandler != nil { | ||
|
@@ -373,6 +388,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea | |
s := t.newStream(ctx, callHdr) | ||
s.clientStatsCtx = userCtx | ||
t.activeStreams[s.id] = s | ||
// if the number of active streams are now equal to 1, then check if keepalive | ||
// was being skipped. If so, fire the keepalive timer | ||
if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 { | ||
t.framer.writePing(true, false, [8]byte{}) | ||
t.controlBuf.put(resetKeepaliveTimer{}) | ||
} | ||
|
||
// This stream is not counted when applySetings(...) initialize t.streamsQuota. | ||
// Reset t.streamsQuota to the right value. | ||
|
@@ -982,6 +1003,7 @@ func (t *http2Client) reader() { | |
t.notifyError(err) | ||
return | ||
} | ||
atomic.AddUint64(&t.activity, 1) | ||
sf, ok := frame.(*http2.SettingsFrame) | ||
if !ok { | ||
t.notifyError(err) | ||
|
@@ -992,6 +1014,7 @@ func (t *http2Client) reader() { | |
// loop to keep reading incoming messages on this transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update activity after handleSettings? |
||
for { | ||
frame, err := t.framer.readFrame() | ||
atomic.AddUint64(&t.activity, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CAS |
||
if err != nil { | ||
// Abort an active stream if the http2.Framer returns a | ||
// http2.StreamError. This can happen only if the server's response | ||
|
@@ -1068,37 +1091,112 @@ func (t *http2Client) applySettings(ss []http2.Setting) { | |
// controller running in a separate goroutine takes charge of sending control | ||
// frames (e.g., window update, reset stream, setting, etc.) to the server. | ||
func (t *http2Client) controller() { | ||
timer := time.NewTimer(t.kp.Time) | ||
if t.kp.Time == infinity { | ||
// Prevent the timer from firing, ever. | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
} | ||
isPingSent := false | ||
keepalivePing := &ping{data: [8]byte{}} | ||
// select toggles between control channel and writable chanel. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: s/chanel/channel/ |
||
// We need to wait on writable channel only after having recieved | ||
// a control message that requires controller to take an action. | ||
// However, while waiting on either of these channels, the keepalive | ||
// timer channel or shutdown channel might trigger. Such toggling | ||
// take care of this case. | ||
cchan := t.controlBuf.get() | ||
var wchan chan int | ||
var controlMsg item | ||
for { | ||
select { | ||
case i := <-t.controlBuf.get(): | ||
case controlMsg = <-cchan: | ||
t.controlBuf.load() | ||
select { | ||
case <-t.writableChan: | ||
switch i := i.(type) { | ||
case *windowUpdate: | ||
t.framer.writeWindowUpdate(true, i.streamID, i.increment) | ||
case *settings: | ||
if i.ack { | ||
t.framer.writeSettingsAck(true) | ||
t.applySettings(i.ss) | ||
} else { | ||
t.framer.writeSettings(true, i.ss...) | ||
} | ||
case *resetStream: | ||
t.framer.writeRSTStream(true, i.streamID, i.code) | ||
case *flushIO: | ||
t.framer.flushWrite() | ||
case *ping: | ||
t.framer.writePing(true, i.ack, i.data) | ||
default: | ||
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) | ||
// If controlMsg is of type resetKeepaliveTimer, | ||
// then check if the keepaliveSkipped flag is still set. | ||
if _, ok := controlMsg.(resetKeepaliveTimer); ok { | ||
atomic.StoreUint32(&t.keepaliveSkipped, 0) | ||
// Reset the timer to timeout. | ||
// Note : This is safe to read, since the | ||
// only codepath that sets the keepaliveSkipped | ||
// flag also resets the timer to infinity. | ||
// Thus, there'll never be a case where we are | ||
// trying to read from an empty timer channel. | ||
isPingSent = true | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
t.writableChan <- 0 | ||
timer.Reset(t.kp.Timeout) | ||
continue | ||
} | ||
wchan = t.writableChan | ||
cchan = nil | ||
case <-wchan: | ||
switch i := controlMsg.(type) { | ||
case *windowUpdate: | ||
t.framer.writeWindowUpdate(true, i.streamID, i.increment) | ||
case *settings: | ||
if i.ack { | ||
t.framer.writeSettingsAck(true) | ||
t.applySettings(i.ss) | ||
} else { | ||
t.framer.writeSettings(true, i.ss...) | ||
} | ||
case *resetStream: | ||
t.framer.writeRSTStream(true, i.streamID, i.code) | ||
case *flushIO: | ||
t.framer.flushWrite() | ||
case *ping: | ||
t.framer.writePing(true, i.ack, i.data) | ||
default: | ||
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) | ||
} | ||
wchan <- 0 | ||
wchan = nil | ||
cchan = t.controlBuf.get() | ||
case <-timer.C: | ||
// All code paths in this case must reset the timer. | ||
|
||
// Get the activity counter value and reset it. | ||
a := atomic.SwapUint64(&t.activity, 0) | ||
if a > 0 { | ||
atomic.StoreUint32(&t.keepaliveSkipped, 0) | ||
isPingSent = false | ||
timer.Reset(t.kp.Time) | ||
continue | ||
case <-t.shutdownChan: | ||
return | ||
} | ||
if isPingSent { | ||
t.Close() | ||
timer.Reset(infinity) | ||
continue | ||
} | ||
t.mu.Lock() | ||
ns := len(t.activeStreams) | ||
if !t.kp.PermitWithoutStream && ns < 1 { | ||
// set flag that signifyies keepalive was skipped | ||
atomic.StoreUint32(&t.keepaliveSkipped, 1) | ||
t.mu.Unlock() | ||
timer.Reset(infinity) | ||
continue | ||
} | ||
t.mu.Unlock() | ||
// reset the keepaliveSkipped flag | ||
atomic.StoreUint32(&t.keepaliveSkipped, 0) | ||
// Send ping. | ||
t.controlBuf.put(keepalivePing) | ||
isPingSent = true | ||
timer.Reset(t.kp.Timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is there so much duplicated code here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily. |
||
case <-t.shutdownChan: | ||
// stop the keepalive timer | ||
if !timer.Stop() { | ||
select { | ||
case <-timer.C: | ||
default: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So note this prevents this goroutine from blocking permanently, but it may not free the timer. The Stop() may not process immediately, in which case It's probably better to either get rid of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we exit the controller function right after, wouldn't the timer be eventually gc'd. Also, since nowhere outside the function is timer.C accessed, it should be safe even if timer.C later gets a value. In fact, maybe we can get rid of the case <- timer.c: altogether? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're forgetting there is a sender of that value. The sender could keep the timer alive and prevent GC. Now, I happen to know that the channel is buffered, and so in this specific case the sender will be able to queue an entry and then remove the timer from its data structures and then the timer could be GCed. |
||
// In case we stopped the timer before the for loop began. | ||
// This happens when keepalive time provided was infinity. | ||
} | ||
} | ||
return | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package transport | ||
|
||
import ( | ||
"math" | ||
"time" | ||
) | ||
|
||
// KeepaliveParameters is used to set keepalive parameters. | ||
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. | ||
Timeout time.Duration | ||
//If true, client runs keepalive checks even with no active RPCs. | ||
PermitWithoutStream bool | ||
} | ||
|
||
// Validate is used to validate the keepalive parameters. | ||
// Time durations initialized to 0 will be replaced with default Values. | ||
func (p *KeepaliveParameters) validate() { | ||
if p.Time == 0 { | ||
p.Time = infinity | ||
} | ||
if p.Timeout == 0 { | ||
p.Timeout = twentyScnd | ||
} | ||
} | ||
|
||
const ( | ||
// Infinity is the default value of keepalive time. | ||
infinity = time.Duration(math.MaxInt64) | ||
// TwentyScnd is the default value of timeout. | ||
twentyScnd = time.Duration(20 * time.Second) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about explaining a bit more.