-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point-to-point health check. Client side implementation #993
Changes from all commits
e58450b
a417b82
93111e4
058907d
45907c2
97fb58a
901cdf6
eeb6f5b
49d2a88
b2448f6
e78a1f3
5fcb58f
0ba0eaf
c8983ba
0bdf059
336b4ea
bc3cca9
3ad7810
e328ede
f220619
25f14b7
9d23576
661dbbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package keepalive | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
// ClientParameters is used to set keepalive parameters on the client-side. | ||
// These configure how the client will actively probe to notice when a connection broken | ||
// and to cause activity so intermediaries are aware the connection is still in use. | ||
type ClientParameters struct { | ||
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. | ||
Time time.Duration // The current default value is infinity. | ||
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that | ||
// the connection is closed. | ||
Timeout time.Duration // The current default value is 20 seconds. | ||
// If true, client runs keepalive checks even with no active RPCs. | ||
PermitWithoutStream bool | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ import ( | |
"net" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
@@ -49,6 +50,7 @@ import ( | |
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/grpclog" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/peer" | ||
"google.golang.org/grpc/stats" | ||
|
@@ -80,6 +82,8 @@ type http2Client struct { | |
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | ||
// that the server sent GoAway on this transport. | ||
goAway chan struct{} | ||
// awakenKeepalive is used to wake up keepalive when after it has gone dormant. | ||
awakenKeepalive chan struct{} | ||
|
||
framer *framer | ||
hBuf *bytes.Buffer // the buffer for HPACK encoding | ||
|
@@ -99,6 +103,11 @@ type http2Client struct { | |
|
||
creds []credentials.PerRPCCredentials | ||
|
||
// Boolean to keep track of reading activity on transport. | ||
// 1 is true and 0 is false. | ||
activity uint32 // Accessed atomically. | ||
kp keepalive.ClientParameters | ||
|
||
statsHandler stats.Handler | ||
|
||
mu sync.Mutex // guard the following variables | ||
|
@@ -182,6 +191,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
if opts.UserAgent != "" { | ||
ua = opts.UserAgent + " " + ua | ||
} | ||
kp := opts.KeepaliveParams | ||
// Validate keepalive parameters. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is setting defaults, not validating. |
||
if kp.Time == 0 { | ||
kp.Time = defaultKeepaliveTime | ||
} | ||
if kp.Timeout == 0 { | ||
kp.Timeout = defaultKeepaliveTimeout | ||
} | ||
var buf bytes.Buffer | ||
t := &http2Client{ | ||
ctx: ctx, | ||
|
@@ -198,6 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
shutdownChan: make(chan struct{}), | ||
errorChan: make(chan struct{}), | ||
goAway: make(chan struct{}), | ||
awakenKeepalive: make(chan struct{}, 1), | ||
framer: newFramer(conn), | ||
hBuf: &buf, | ||
hEnc: hpack.NewEncoder(&buf), | ||
|
@@ -211,8 +229,12 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
maxStreams: defaultMaxStreamsClient, | ||
streamsQuota: newQuotaPool(defaultMaxStreamsClient), | ||
streamSendQuota: defaultWindowSize, | ||
kp: kp, | ||
statsHandler: opts.StatsHandler, | ||
} | ||
// Make sure awakenKeepalive can't be written upon. | ||
// keepalive routine will make it writable, if need be. | ||
t.awakenKeepalive <- struct{}{} | ||
if t.statsHandler != nil { | ||
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
RemoteAddr: t.remoteAddr, | ||
|
@@ -257,6 +279,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
} | ||
} | ||
go t.controller() | ||
if t.kp.Time != infinity { | ||
go t.keepalive() | ||
} | ||
t.writableChan <- 0 | ||
return t, nil | ||
} | ||
|
@@ -369,6 +394,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea | |
s := t.newStream(ctx, callHdr) | ||
s.clientStatsCtx = userCtx | ||
t.activeStreams[s.id] = s | ||
// If the number of active streams change from 0 to 1, then check if keepalive | ||
// has gone dormant. If so, wake it up. | ||
if len(t.activeStreams) == 1 { | ||
select { | ||
case t.awakenKeepalive <- struct{}{}: | ||
t.framer.writePing(false, false, [8]byte{}) | ||
default: | ||
} | ||
} | ||
|
||
t.mu.Unlock() | ||
|
||
|
@@ -980,6 +1014,7 @@ func (t *http2Client) reader() { | |
t.notifyError(err) | ||
return | ||
} | ||
atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
sf, ok := frame.(*http2.SettingsFrame) | ||
if !ok { | ||
t.notifyError(err) | ||
|
@@ -990,6 +1025,7 @@ func (t *http2Client) reader() { | |
// loop to keep reading incoming messages on this transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update activity after handleSettings? |
||
for { | ||
frame, err := t.framer.readFrame() | ||
atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
if err != nil { | ||
// Abort an active stream if the http2.Framer returns a | ||
// http2.StreamError. This can happen only if the server's response | ||
|
@@ -1102,6 +1138,61 @@ func (t *http2Client) controller() { | |
} | ||
} | ||
|
||
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | ||
func (t *http2Client) keepalive() { | ||
p := &ping{data: [8]byte{}} | ||
timer := time.NewTimer(t.kp.Time) | ||
for { | ||
select { | ||
case <-timer.C: | ||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
timer.Reset(t.kp.Time) | ||
continue | ||
} | ||
// Check if keepalive should go dormant. | ||
t.mu.Lock() | ||
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | ||
// Make awakenKeepalive writable. | ||
<-t.awakenKeepalive | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this happen under lock? seems clearer to write:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Emptying of the awakenKeepalive channel needs to happen inside the lock so that some other thread creating a new stream sees the channel as writable if the number of streams go form 0 to 1. |
||
t.mu.Unlock() | ||
select { | ||
case <-t.awakenKeepalive: | ||
// If the control gets here a ping has been sent | ||
// need to reset the timer with keepalive.Timeout. | ||
case <-t.shutdownChan: | ||
return | ||
} | ||
} else { | ||
t.mu.Unlock() | ||
// Send ping. | ||
t.controlBuf.put(p) | ||
} | ||
|
||
// By the time control gets here a ping has been sent one way or the other. | ||
timer.Reset(t.kp.Timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is there so much duplicated code here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily. |
||
select { | ||
case <-timer.C: | ||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
timer.Reset(t.kp.Time) | ||
continue | ||
} | ||
t.Close() | ||
return | ||
case <-t.shutdownChan: | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
return | ||
} | ||
case <-t.shutdownChan: | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (t *http2Client) Error() <-chan struct{} { | ||
return t.errorChan | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this needs to be another package? Can't it ben inside the grpc package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separate package looks weird to me as well. We may want to figure out alternatives.