-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point-to-point health check. Client side implementation #993
Changes from 12 commits
e58450b
a417b82
93111e4
058907d
45907c2
97fb58a
901cdf6
eeb6f5b
49d2a88
b2448f6
e78a1f3
5fcb58f
0ba0eaf
c8983ba
0bdf059
336b4ea
bc3cca9
3ad7810
e328ede
f220619
25f14b7
9d23576
661dbbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package keepalive | ||
|
||
import ( | ||
"math" | ||
"time" | ||
) | ||
|
||
// Params is used to set keepalive parameters. | ||
type Params struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to think about what happens when these are 0. |
||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/fot/for/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be nice to wrap this. |
||
Timeout time.Duration | ||
//If true, client runs keepalive checks even with no active RPCs. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing space |
||
PermitWithoutStream bool | ||
} | ||
|
||
// Validate is used to validate the keepalive parameters. | ||
// Time durations initialized to 0 will be replaced with default Values. | ||
func (p *Params) Validate() { | ||
if p.Time == 0 { | ||
p.Time = Infinity | ||
} | ||
if p.Timeout == 0 { | ||
p.Time = TwentySec | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/Time/Timeout/ |
||
} | ||
} | ||
|
||
const ( | ||
// Infinity is the default value of keepalive time. | ||
Infinity = time.Duration(math.MaxInt64) | ||
// TwentySec is the default value of timeout. | ||
TwentySec = time.Duration(20 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a really poor constant to export. There is no way to change it to 30 seconds or similar in the future. One option is to export the defaults. A better option is to not export them. Two ways to remove the export: 1) in the transport, create a zero-filled Params{} and then call Validate(), 2) move the Validate logic to transport. I have preference to 2 because it also removes the Validate export. |
||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ import ( | |
"net" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
@@ -49,6 +50,7 @@ import ( | |
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/grpclog" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/peer" | ||
"google.golang.org/grpc/stats" | ||
|
@@ -99,6 +101,14 @@ type http2Client struct { | |
|
||
creds []credentials.PerRPCCredentials | ||
|
||
// Counter to keep track of reading activity on transport. | ||
activity uint64 // accessed atomically. | ||
// Flag to keep track if the keepalive check was skipped because there | ||
// were no active streams and keepalive.PermitWithoutStream was false | ||
// keepaliveSkipped = 1 means skipped | ||
keepaliveSkipped uint32 // accessed atomically | ||
// keepalive parameters. | ||
kp *keepalive.Params | ||
statsHandler stats.Handler | ||
|
||
mu sync.Mutex // guard the following variables | ||
|
@@ -182,6 +192,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
if opts.UserAgent != "" { | ||
ua = opts.UserAgent + " " + ua | ||
} | ||
kp := defaultKeepaliveParams | ||
if opts.KeepaliveParams != nil { | ||
kp = opts.KeepaliveParams | ||
kp.Validate() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mutates an object from the user. Should there be some copy involved? |
||
} | ||
var buf bytes.Buffer | ||
t := &http2Client{ | ||
ctx: ctx, | ||
|
@@ -210,6 +225,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
creds: opts.PerRPCCredentials, | ||
maxStreams: math.MaxInt32, | ||
streamSendQuota: defaultWindowSize, | ||
kp: kp, | ||
statsHandler: opts.StatsHandler, | ||
} | ||
if t.statsHandler != nil { | ||
|
@@ -373,6 +389,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea | |
s := t.newStream(ctx, callHdr) | ||
s.clientStatsCtx = userCtx | ||
t.activeStreams[s.id] = s | ||
// if the number of active streams are now equal to 1, then check if keepalive | ||
// was being skipped. If so, fire the keepalive timer | ||
if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 { | ||
t.controlBuf.put(fireKeepaliveTimer{}) | ||
} | ||
|
||
// This stream is not counted when applySetings(...) initialize t.streamsQuota. | ||
// Reset t.streamsQuota to the right value. | ||
|
@@ -992,6 +1013,7 @@ func (t *http2Client) reader() { | |
// loop to keep reading incoming messages on this transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update activity after handleSettings? |
||
for { | ||
frame, err := t.framer.readFrame() | ||
atomic.AddUint64(&t.activity, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CAS |
||
if err != nil { | ||
// Abort an active stream if the http2.Framer returns a | ||
// http2.StreamError. This can happen only if the server's response | ||
|
@@ -1068,36 +1090,95 @@ func (t *http2Client) applySettings(ss []http2.Setting) { | |
// controller running in a separate goroutine takes charge of sending control | ||
// frames (e.g., window update, reset stream, setting, etc.) to the server. | ||
func (t *http2Client) controller() { | ||
timer := time.NewTimer(t.kp.Time) | ||
if t.kp.Timeout == keepalive.Infinity { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/Timeout/Time/ |
||
// Prevent the timer from firing, ever. | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
} | ||
isPingSent := false | ||
keepalivePing := &ping{data: [8]byte{}} | ||
// select toggles between control channel and writable chanel. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: s/chanel/channel/ |
||
// We need to wait on writable channel only after having recieved | ||
// a control message that requires controller to take an action. | ||
// However, while waiting on either of these channels, the keepalive | ||
// timer channel or shutdown channel might trigger. Such toggling | ||
// take care of this case. | ||
cchan := t.controlBuf.get() | ||
var wchan chan int | ||
var controlMsg item | ||
for { | ||
select { | ||
case i := <-t.controlBuf.get(): | ||
case controlMsg = <-cchan: | ||
t.controlBuf.load() | ||
select { | ||
case <-t.writableChan: | ||
switch i := i.(type) { | ||
case *windowUpdate: | ||
t.framer.writeWindowUpdate(true, i.streamID, i.increment) | ||
case *settings: | ||
if i.ack { | ||
t.framer.writeSettingsAck(true) | ||
t.applySettings(i.ss) | ||
} else { | ||
t.framer.writeSettings(true, i.ss...) | ||
// If controlMsg is of type fireKeepaliveTimer, | ||
// then check if the keepaliveSkipped flag is still set. | ||
if _, ok := controlMsg.(fireKeepaliveTimer); ok { | ||
if atomic.LoadUint32(&t.keepaliveSkipped) == 1 { | ||
// Reset the timer to 0 so that it fires. | ||
if !timer.Stop() { | ||
<-timer.C | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you make a note that this is safe since the timer will always be Reset in Otherwise it could deadlock. https://play.golang.org/p/4CHuWZJFwu |
||
} | ||
case *resetStream: | ||
t.framer.writeRSTStream(true, i.streamID, i.code) | ||
case *flushIO: | ||
t.framer.flushWrite() | ||
case *ping: | ||
t.framer.writePing(true, i.ack, i.data) | ||
default: | ||
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) | ||
timer.Reset(0) | ||
} | ||
t.writableChan <- 0 | ||
continue | ||
case <-t.shutdownChan: | ||
return | ||
} | ||
wchan = t.writableChan | ||
cchan = nil | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
case <-wchan: | ||
switch i := controlMsg.(type) { | ||
case *windowUpdate: | ||
t.framer.writeWindowUpdate(true, i.streamID, i.increment) | ||
case *settings: | ||
if i.ack { | ||
t.framer.writeSettingsAck(true) | ||
t.applySettings(i.ss) | ||
} else { | ||
t.framer.writeSettings(true, i.ss...) | ||
} | ||
case *resetStream: | ||
t.framer.writeRSTStream(true, i.streamID, i.code) | ||
case *flushIO: | ||
t.framer.flushWrite() | ||
case *ping: | ||
t.framer.writePing(true, i.ack, i.data) | ||
default: | ||
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) | ||
} | ||
wchan <- 0 | ||
wchan = nil | ||
cchan = t.controlBuf.get() | ||
continue | ||
case <-timer.C: | ||
t.mu.Lock() | ||
ns := len(t.activeStreams) | ||
t.mu.Unlock() | ||
if !t.kp.PermitWithoutStream && ns < 1 { | ||
timer.Reset(t.kp.Time) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... Won't this cause the timer to continue firing on an idle channel? It also makes the various states hard to follow. Could this just be reset to Inifinity? |
||
isPingSent = false | ||
// set flag that signifyies keepalive was skipped | ||
atomic.StoreUint32(&t.keepaliveSkipped, 1) | ||
continue | ||
} | ||
// reset the keepaliveSkipped flag | ||
atomic.StoreUint32(&t.keepaliveSkipped, 0) | ||
// Get the activity counter value and reset it. | ||
a := atomic.SwapUint64(&t.activity, 0) | ||
if a > 0 { | ||
timer.Reset(t.kp.Time) | ||
isPingSent = false | ||
continue | ||
} | ||
if !isPingSent { | ||
// Send ping. | ||
t.controlBuf.put(keepalivePing) | ||
isPingSent = true | ||
timer.Reset(t.kp.Timeout) | ||
continue | ||
} | ||
t.Close() | ||
case <-t.shutdownChan: | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this needs to be another package? Can't it ben inside the grpc package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separate package looks weird to me as well. We may want to figure out alternatives.