-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point-to-point health check. Client side implementation #993
Changes from 18 commits
e58450b
a417b82
93111e4
058907d
45907c2
97fb58a
901cdf6
eeb6f5b
49d2a88
b2448f6
e78a1f3
5fcb58f
0ba0eaf
c8983ba
0bdf059
336b4ea
bc3cca9
3ad7810
e328ede
f220619
25f14b7
9d23576
661dbbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package keepalive | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
// ClientParameters is used to set keepalive parameters for the client side. | ||
type ClientParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to think about what happens when these are 0. |
||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/fot/for/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be nice to wrap this. |
||
Timeout time.Duration | ||
//If true, client runs keepalive checks even with no active RPCs. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing space |
||
PermitWithoutStream bool | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ import ( | |
"net" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
@@ -49,6 +50,7 @@ import ( | |
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/grpclog" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/peer" | ||
"google.golang.org/grpc/stats" | ||
|
@@ -80,6 +82,8 @@ type http2Client struct { | |
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | ||
// that the server sent GoAway on this transport. | ||
goAway chan struct{} | ||
// awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer. | ||
awakenKeepalive chan int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this int instead of |
||
|
||
framer *framer | ||
hBuf *bytes.Buffer // the buffer for HPACK encoding | ||
|
@@ -99,6 +103,11 @@ type http2Client struct { | |
|
||
creds []credentials.PerRPCCredentials | ||
|
||
// Counter to keep track of reading activity on transport. | ||
activity uint64 // Accessed atomically. | ||
|
||
kp keepalive.ClientParameters | ||
|
||
statsHandler stats.Handler | ||
|
||
mu sync.Mutex // guard the following variables | ||
|
@@ -182,6 +191,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
if opts.UserAgent != "" { | ||
ua = opts.UserAgent + " " + ua | ||
} | ||
kp := opts.KeepaliveParams | ||
// Validate keepalive parameters. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is setting defaults, not validating. |
||
if kp.Time == 0 { | ||
kp.Time = defaultKeepaliveTime | ||
} | ||
if kp.Timeout == 0 { | ||
kp.Timeout = defaultKeepaliveTimeout | ||
} | ||
var buf bytes.Buffer | ||
t := &http2Client{ | ||
ctx: ctx, | ||
|
@@ -198,6 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
shutdownChan: make(chan struct{}), | ||
errorChan: make(chan struct{}), | ||
goAway: make(chan struct{}), | ||
awakenKeepalive: make(chan int, 1), | ||
framer: newFramer(conn), | ||
hBuf: &buf, | ||
hEnc: hpack.NewEncoder(&buf), | ||
|
@@ -210,8 +228,12 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
creds: opts.PerRPCCredentials, | ||
maxStreams: math.MaxInt32, | ||
streamSendQuota: defaultWindowSize, | ||
kp: kp, | ||
statsHandler: opts.StatsHandler, | ||
} | ||
// make sure awakenKeepalive can't be written upon. | ||
// keepalive routine will make it writable, if need be. | ||
t.awakenKeepalive <- 0 | ||
if t.statsHandler != nil { | ||
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
RemoteAddr: t.remoteAddr, | ||
|
@@ -256,6 +278,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
} | ||
} | ||
go t.controller() | ||
go t.keepalive() | ||
t.writableChan <- 0 | ||
return t, nil | ||
} | ||
|
@@ -373,6 +396,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea | |
s := t.newStream(ctx, callHdr) | ||
s.clientStatsCtx = userCtx | ||
t.activeStreams[s.id] = s | ||
// If the number of active streams change from 0 to 1, then check if keepalive | ||
// has gone dormant. If so, wake it up. | ||
if len(t.activeStreams) == 1 { | ||
select { | ||
case t.awakenKeepalive <- 0: | ||
t.framer.writePing(true, false, [8]byte{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to flush the ping, we will flush the headers later. |
||
default: | ||
} | ||
} | ||
|
||
// This stream is not counted when applySetings(...) initialize t.streamsQuota. | ||
// Reset t.streamsQuota to the right value. | ||
|
@@ -982,6 +1014,7 @@ func (t *http2Client) reader() { | |
t.notifyError(err) | ||
return | ||
} | ||
atomic.AddUint64(&t.activity, 1) | ||
sf, ok := frame.(*http2.SettingsFrame) | ||
if !ok { | ||
t.notifyError(err) | ||
|
@@ -992,6 +1025,7 @@ func (t *http2Client) reader() { | |
// loop to keep reading incoming messages on this transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update activity after handleSettings? |
||
for { | ||
frame, err := t.framer.readFrame() | ||
atomic.AddUint64(&t.activity, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CAS |
||
if err != nil { | ||
// Abort an active stream if the http2.Framer returns a | ||
// http2.StreamError. This can happen only if the server's response | ||
|
@@ -1104,6 +1138,63 @@ func (t *http2Client) controller() { | |
} | ||
} | ||
|
||
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | ||
func (t *http2Client) keepalive() { | ||
if t.kp.Time == time.Duration(math.MaxInt64) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should use |
||
return | ||
} | ||
p := &ping{data: [8]byte{}} | ||
timer := time.NewTimer(t.kp.Time) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No all exit paths of this routine have a timer running. Trying to stop and empty timer.C of an already stopped timer leads to a deadlock. |
||
for { | ||
select { | ||
case <-timer.C: | ||
if a := atomic.SwapUint64(&t.activity, 0); a > 0 { | ||
timer.Reset(t.kp.Time) | ||
continue | ||
} | ||
// Check if keepalive should go dormant. | ||
t.mu.Lock() | ||
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | ||
// Make awakenKeepalive writable. | ||
<-t.awakenKeepalive | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this happen under lock? seems clearer to write:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Emptying of the awakenKeepalive channel needs to happen inside the lock so that some other thread creating a new stream sees the channel as writable if the number of streams go form 0 to 1. |
||
t.mu.Unlock() | ||
select { | ||
case <-t.awakenKeepalive: | ||
// If the control gets here a ping has been sent | ||
// need to reset the timer with keepalive.Timeout. | ||
case <-t.shutdownChan: | ||
return | ||
} | ||
} else { | ||
t.mu.Unlock() | ||
// Send ping. | ||
t.controlBuf.put(p) | ||
} | ||
|
||
// By the time control gets here a ping has been sent one way or the other. | ||
timer.Reset(t.kp.Timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is there so much duplicated code here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily. |
||
select { | ||
case <-timer.C: | ||
if a := atomic.SwapUint64(&t.activity, 0); a > 0 { | ||
timer.Reset(t.kp.Time) | ||
continue | ||
} | ||
t.Close() | ||
case <-t.shutdownChan: | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
return | ||
} | ||
case <-t.shutdownChan: | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (t *http2Client) Error() <-chan struct{} { | ||
return t.errorChan | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,12 +41,15 @@ import ( | |
"bytes" | ||
"fmt" | ||
"io" | ||
"math" | ||
"net" | ||
"sync" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/stats" | ||
"google.golang.org/grpc/tap" | ||
|
@@ -382,6 +385,8 @@ type ConnectOptions struct { | |
PerRPCCredentials []credentials.PerRPCCredentials | ||
// TransportCredentials stores the Authenticator required to setup a client connection. | ||
TransportCredentials credentials.TransportCredentials | ||
// KeepaliveParams stores the keepalive parameters. | ||
KeepaliveParams keepalive.ClientParameters | ||
// StatsHandler stores the handler for stats. | ||
StatsHandler stats.Handler | ||
} | ||
|
@@ -606,3 +611,10 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <- | |
return i, nil | ||
} | ||
} | ||
|
||
// keepalive related constants. | ||
const ( | ||
infinity = time.Duration(math.MaxInt64) | ||
defaultKeepaliveTime = infinity | ||
defaultKeepaliveTimeout = time.Duration(20 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this needs to be another package? Can't it ben inside the grpc package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separate package looks weird to me as well. We may want to figure out alternatives.