-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point-to-point health check. Client side implementation #993
Conversation
@@ -0,0 +1,30 @@ | |||
package keepalive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this needs to be another package? Can't it ben inside the grpc package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separate package looks weird to me as well. We may want to figure out alternatives.
keepalive/keepalive.go
Outdated
// Params is used to set keepalive parameters. | ||
type Params struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Ktime time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the K prefix for all the fields?
keepalive/keepalive.go
Outdated
|
||
// DefaultKParams contains default values for keepalive parameters | ||
var DefaultKParams = Params{ | ||
Ktime: time.Duration(math.MaxInt64), // default to infinite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not -1 as infinite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting it to -1 would fire instantly.
keepalive/keepalive.go
Outdated
var DefaultKParams = Params{ | ||
Ktime: time.Duration(math.MaxInt64), // default to infinite | ||
Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds | ||
KNoStream: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line can be removed, false is the default.
keepalive/keepalive.go
Outdated
// DefaultKParams contains default values for keepalive parameters | ||
var DefaultKParams = Params{ | ||
Ktime: time.Duration(math.MaxInt64), // default to infinite | ||
Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to 20 * time.Second
, the is is much clearer and won't need the comment.
transport/http2_client.go
Outdated
@@ -98,6 +100,11 @@ type http2Client struct { | |||
|
|||
creds []credentials.PerRPCCredentials | |||
|
|||
// activity counter | |||
activity *uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment about being atomic?
transport/http2_client.go
Outdated
@@ -98,6 +100,11 @@ type http2Client struct { | |||
|
|||
creds []credentials.PerRPCCredentials | |||
|
|||
// activity counter | |||
activity *uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's common to just use an uint64
, rather then a reference. For example see http.Server code: https://github.com/golang/go/blob/7dc97d9e328edc800e2ce41d5b211ef4e0ef41d0/src/net/http/server.go#L2363-L2364.
keepalive/keepalive.go
Outdated
} | ||
|
||
// Mu is a mutex to protect Enabled variable | ||
var Mu = sync.Mutex{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather then exposing this lock, why not use functions for it. e.g.
func Enabled() bool {}
func Enable() {}
func Disable() {}
keepalive/keepalive.go
Outdated
// Params is used to set keepalive parameters. | ||
type Params struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think about what happens when these are 0.
keepalive/keepalive.go
Outdated
} | ||
|
||
// DefaultParams contains default values for keepalive parameters. | ||
var DefaultParams = Params{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving to transport.
keepalive/keepalive.go
Outdated
var enable = false | ||
|
||
// Enabled exposes the value of enable variable. | ||
func Enabled() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unnecessary. I don't know a case where WithKeepaliveParams wouldn't work.
transport/http2_client.go
Outdated
@@ -98,6 +100,11 @@ type http2Client struct { | |||
|
|||
creds []credentials.PerRPCCredentials | |||
|
|||
// Counter to keep track of activity(reading and writing on transport). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only be used for reading. Writing does not prevent the need for keepalive ping.
@@ -976,6 +989,7 @@ func (t *http2Client) reader() { | |||
// loop to keep reading incoming messages on this transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update activity after handleSettings?
transport/http2_client.go
Outdated
t.mu.Unlock() | ||
// Get the activity counter value and reset it. | ||
a := atomic.SwapUint64(&t.activity, 0) | ||
if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to follow this part of the spec:
Since keepalive is not occurring on HTTP/2 connections without any streams, there will be a higher chance of failure for new RPCs following a long period of inactivity. To reduce the tail latency for these RPCs, it is important to not reset the
keepalive time
when a connection becomes active; if a new stream is created and there has been greater thankeepalive time
since the last read byte, then a keepalive PING should be sent (ideally before the HEADERS frame). Doing so detects the broken connection with a latency ofkeepalive timeout
instead ofkeepalive time + timeout
.
transport/http2_client.go
Outdated
kp = opts.KeepaliveParams | ||
kp.Validate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mutates an object from the user. Should there be some copy involved?
keepalive/keepalive.go
Outdated
// Infinity is the default value of keepalive time. | ||
Infinity = time.Duration(math.MaxInt64) | ||
// TwentySec is the default value of timeout. | ||
TwentySec = time.Duration(20 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really poor constant to export. There is no way to change it to 30 seconds or similar in the future. One option is to export the defaults. A better option is to not export them.
Two ways to remove the export: 1) in the transport, create a zero-filled Params{} and then call Validate(), 2) move the Validate logic to transport. I have preference to 2 because it also removes the Validate export.
transport/http2_client.go
Outdated
// Prevent the timer from firing, ever. | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
} | ||
isPingSent := false | ||
keepalivePing := &ping{data: [8]byte{}} | ||
// select toggles between control channel and writable chanel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/chanel/channel/
transport/http2_client.go
Outdated
if atomic.LoadUint32(&t.keepaliveSkipped) == 1 { | ||
// Reset the timer to 0 so that it fires. | ||
if !timer.Stop() { | ||
<-timer.C |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make a note that this is safe since the timer will always be Reset in case <-timer.C
?
Otherwise it could deadlock. https://play.golang.org/p/4CHuWZJFwu
transport/http2_client.go
Outdated
} | ||
wchan = t.writableChan | ||
cchan = nil | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This continue
has no effect. Ditto in next case. Remove?
keepalive/keepalive.go
Outdated
p.Time = Infinity | ||
} | ||
if p.Timeout == 0 { | ||
p.Time = TwentySec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Time/Timeout/
transport/http2_client.go
Outdated
@@ -1070,64 +1091,94 @@ func (t *http2Client) applySettings(ss []http2.Setting) { | |||
// frames (e.g., window update, reset stream, setting, etc.) to the server. | |||
func (t *http2Client) controller() { | |||
timer := time.NewTimer(t.kp.Time) | |||
if !keepalive.Enabled() { | |||
if t.kp.Timeout == keepalive.Infinity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Timeout/Time/
transport/http2_client.go
Outdated
case <-timer.C: | ||
t.mu.Lock() | ||
ns := len(t.activeStreams) | ||
t.mu.Unlock() | ||
if !t.kp.PermitWithoutStream && ns < 1 { | ||
timer.Reset(t.kp.Time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... Won't this cause the timer to continue firing on an idle channel? It also makes the various states hard to follow. Could this just be reset to Inifinity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty close
keepalive.go
Outdated
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/fot/for/
keepalive.go
Outdated
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/keepalive_timeout/Timeout/ ?
keepalive.go
Outdated
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this documentation is saying what you hope. It actually says it will always close the connection after a keepalive check. You need something saying that it only matters when nothing is received by the timeout.
keepalive.go
Outdated
|
||
// KeepaliveParameters is used to set keepalive parameters. | ||
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a duration of this time after having receiving nothing. (I'm pointing out the issue; my text still needs rewording)
keepalive.go
Outdated
"time" | ||
) | ||
|
||
// KeepaliveParameters is used to set keepalive parameters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about explaining a bit more.
KeepaliveParameters configures how the ClientConn will actively probe to notice when a connection is broken and to cause activity so intermediaries are aware the connection is still in use.
transport/http2_client.go
Outdated
// keepaliveSkipped = 1 means skipped | ||
keepaliveSkipped uint32 // accessed atomically | ||
// keepalive parameters. | ||
kp *KeepaliveParameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe get rid of the pointer? It is probably better to inline it from a locality perspective, but it would also mean validate()
wouldn't need as much thought. I think it is safe today, but you are mutating a structure that is used from multiple threads but without any synchronization; it would be trivial to accidentally break that in the future.
Note this also would make defaultKeepaliveParams
unnecessary.
transport/http2_client.go
Outdated
if !timer.Stop() { | ||
select { | ||
case <-timer.C: | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So note this prevents this goroutine from blocking permanently, but it may not free the timer. The Stop() may not process immediately, in which case <-timer.C
doesn't have a value yet, but it will in the future.
It's probably better to either get rid of the if t.kp.Time == infinity {
optimization above or check a second time here (you could have a variable, say isTimerUsed
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we exit the controller function right after, wouldn't the timer be eventually gc'd. Also, since nowhere outside the function is timer.C accessed, it should be safe even if timer.C later gets a value. In fact, maybe we can get rid of the case <- timer.c: altogether?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're forgetting there is a sender of that value. The sender could keep the timer alive and prevent GC.
Now, I happen to know that the channel is buffered, and so in this specific case the sender will be able to queue an entry and then remove the timer from its data structures and then the timer could be GCed.
keepalive.go
Outdated
|
||
// KeepaliveParameters is used to set keepalive parameters. | ||
type KeepaliveParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems there should be some mention of the current defaults. Here and below. Try to use language that implies the defaults may change, like "the current default is" and similar.
Thanks for the comments, Eric. On it! |
keepalive/keepalive.go
Outdated
type ClientParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/fot/for/
keepalive/keepalive.go
Outdated
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. | ||
Timeout time.Duration | ||
//If true, client runs keepalive checks even with no active RPCs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing space
keepalive/keepalive.go
Outdated
type ClientParameters struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to wrap this.
@@ -182,6 +196,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |||
if opts.UserAgent != "" { | |||
ua = opts.UserAgent + " " + ua | |||
} | |||
kp := opts.KeepaliveParams | |||
// Validate keepalive parameters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is setting defaults, not validating.
transport/http2_client.go
Outdated
@@ -80,6 +82,8 @@ type http2Client struct { | |||
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | |||
// that the server sent GoAway on this transport. | |||
goAway chan struct{} | |||
// awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer. | |||
awakenKeepalive chan int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this int instead of struct{}
? the value is never used.
transport/http2_client.go
Outdated
return | ||
} | ||
p := &ping{data: [8]byte{}} | ||
timer := time.NewTimer(t.kp.Time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer timer.Stop()
below this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No all exit paths of this routine have a timer running. Trying to stop and empty timer.C of an already stopped timer leads to a deadlock.
Doing this in defer would mean making sure that all exit paths of the routine reset the timer(if one is not running already). Future updates to the code might miss that and get into trouble.
t.mu.Lock() | ||
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | ||
// Make awakenKeepalive writable. | ||
<-t.awakenKeepalive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this happen under lock? seems clearer to write:
t.mu.Lock()
dormant := len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream
t.mu.Unlock()
if dormant {
...
} else {
t.controlBuf.put(p)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emptying of the awakenKeepalive channel needs to happen inside the lock so that some other thread creating a new stream sees the channel as writable if the number of streams go form 0 to 1.
} | ||
|
||
// By the time control gets here a ping has been sent one way or the other. | ||
timer.Reset(t.kp.Timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is there so much duplicated code here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily.
To reuse the outer select we'll need to have some additional book-keeping about if the ping was sent or not.
transport/transport.go
Outdated
const ( | ||
infinity = time.Duration(math.MaxInt64) | ||
defaultKeepaliveTime = infinity | ||
defaultKeepaliveTimeout = time.Duration(20 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.Duration
is not necessary here.
} | ||
defer conn.Close() | ||
// Sleep for keepalive to close the connection. | ||
time.Sleep(4 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps this should be a function of the settings above, rather than hardcoded.
there are 4 instances of this.
transport/http2_client.go
Outdated
@@ -1104,6 +1138,63 @@ func (t *http2Client) controller() { | |||
} | |||
} | |||
|
|||
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | |||
func (t *http2Client) keepalive() { | |||
if t.kp.Time == infinity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this check before goroutine is created?
transport/http2_client.go
Outdated
if len(t.activeStreams) == 1 { | ||
select { | ||
case t.awakenKeepalive <- struct{}{}: | ||
t.framer.writePing(true, false, [8]byte{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to flush the ping, we will flush the headers later.
transport/http2_client.go
Outdated
@@ -992,6 +1025,7 @@ func (t *http2Client) reader() { | |||
// loop to keep reading incoming messages on this transport. | |||
for { | |||
frame, err := t.framer.readFrame() | |||
atomic.AddUint64(&t.activity, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CAS
transport/http2_client.go
Outdated
for { | ||
select { | ||
case <-timer.C: | ||
if a := atomic.SwapUint32(&t.activity, 0); a == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CAS(a, 1, 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Switch from Peter's fork of grpc to my own fork, since Peter's out of office. My fork is https://github.com/grpc/grpc-go head + Peter's one commit increasing the per-stream flow-control window. The motivation of syncing to newer gRPC is to include grpc/grpc-go#993 for helping with cockroachdb/cockroach#13989 - we will move to gRPC internal heartbeats instead of using our own connection heartbeats. Other dep updates: - the Lightstep update is a single commit that seems innocuous
Switch from Peter's fork of grpc to my own fork, since Peter's out of office. My fork is https://github.com/grpc/grpc-go head + Peter's one commit increasing the per-stream flow-control window. The motivation of syncing to newer gRPC is to include grpc/grpc-go#993 for helping with cockroachdb#13989 - we will move to gRPC internal heartbeats instead of using our own connection heartbeats in a future cockroach commit. Other dep updates: - the Lightstep update is a single commit that seems innocuous
No description provided.