Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point-to-point health check. Client side implementation #993

Merged
merged 23 commits into from
Mar 7, 2017

Conversation

MakMukhi
Copy link
Contributor

No description provided.

@@ -0,0 +1,30 @@
package keepalive

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this needs to be another package? Can't it ben inside the grpc package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separate package looks weird to me as well. We may want to figure out alternatives.

// Params is used to set keepalive parameters.
type Params struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Ktime time.Duration

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the K prefix for all the fields?


// DefaultKParams contains default values for keepalive parameters
var DefaultKParams = Params{
Ktime: time.Duration(math.MaxInt64), // default to infinite

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not -1 as infinite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting it to -1 would fire instantly.

var DefaultKParams = Params{
Ktime: time.Duration(math.MaxInt64), // default to infinite
Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds
KNoStream: false,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can be removed, false is the default.

// DefaultKParams contains default values for keepalive parameters
var DefaultKParams = Params{
Ktime: time.Duration(math.MaxInt64), // default to infinite
Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to 20 * time.Second, the is is much clearer and won't need the comment.

@@ -98,6 +100,11 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// activity counter
activity *uint64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about being atomic?

@@ -98,6 +100,11 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// activity counter
activity *uint64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's common to just use an uint64, rather then a reference. For example see http.Server code: https://github.com/golang/go/blob/7dc97d9e328edc800e2ce41d5b211ef4e0ef41d0/src/net/http/server.go#L2363-L2364.

}

// Mu is a mutex to protect Enabled variable
var Mu = sync.Mutex{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather then exposing this lock, why not use functions for it. e.g.

func Enabled() bool {}
func Enable() {}
func Disable() {}

// Params is used to set keepalive parameters.
type Params struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about what happens when these are 0.

}

// DefaultParams contains default values for keepalive parameters.
var DefaultParams = Params{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving to transport.

var enable = false

// Enabled exposes the value of enable variable.
func Enabled() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary. I don't know a case where WithKeepaliveParams wouldn't work.

@@ -98,6 +100,11 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// Counter to keep track of activity(reading and writing on transport).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only be used for reading. Writing does not prevent the need for keepalive ping.

@@ -976,6 +989,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update activity after handleSettings?

t.mu.Unlock()
// Get the activity counter value and reset it.
a := atomic.SwapUint64(&t.activity, 0)
if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to follow this part of the spec:

Since keepalive is not occurring on HTTP/2 connections without any streams, there will be a higher chance of failure for new RPCs following a long period of inactivity. To reduce the tail latency for these RPCs, it is important to not reset the keepalive time when a connection becomes active; if a new stream is created and there has been greater than keepalive time since the last read byte, then a keepalive PING should be sent (ideally before the HEADERS frame). Doing so detects the broken connection with a latency of keepalive timeout instead of keepalive time + timeout.

kp = opts.KeepaliveParams
kp.Validate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutates an object from the user. Should there be some copy involved?

// Infinity is the default value of keepalive time.
Infinity = time.Duration(math.MaxInt64)
// TwentySec is the default value of timeout.
TwentySec = time.Duration(20 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really poor constant to export. There is no way to change it to 30 seconds or similar in the future. One option is to export the defaults. A better option is to not export them.

Two ways to remove the export: 1) in the transport, create a zero-filled Params{} and then call Validate(), 2) move the Validate logic to transport. I have preference to 2 because it also removes the Validate export.

// Prevent the timer from firing, ever.
if !timer.Stop() {
<-timer.C
}
}
isPingSent := false
keepalivePing := &ping{data: [8]byte{}}
// select toggles between control channel and writable chanel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/chanel/channel/

if atomic.LoadUint32(&t.keepaliveSkipped) == 1 {
// Reset the timer to 0 so that it fires.
if !timer.Stop() {
<-timer.C
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make a note that this is safe since the timer will always be Reset in case <-timer.C?

Otherwise it could deadlock. https://play.golang.org/p/4CHuWZJFwu

}
wchan = t.writableChan
cchan = nil
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continue has no effect. Ditto in next case. Remove?

p.Time = Infinity
}
if p.Timeout == 0 {
p.Time = TwentySec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Time/Timeout/

@@ -1070,64 +1091,94 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() {
timer := time.NewTimer(t.kp.Time)
if !keepalive.Enabled() {
if t.kp.Timeout == keepalive.Infinity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Timeout/Time/

case <-timer.C:
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if !t.kp.PermitWithoutStream && ns < 1 {
timer.Reset(t.kp.Time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Won't this cause the timer to continue firing on an idle channel? It also makes the various states hard to follow. Could this just be reset to Inifinity?

Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty close

keepalive.go Outdated
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fot/for/

keepalive.go Outdated
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/keepalive_timeout/Timeout/ ?

keepalive.go Outdated
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this documentation is saying what you hope. It actually says it will always close the connection after a keepalive check. You need something saying that it only matters when nothing is received by the timeout.

keepalive.go Outdated

// KeepaliveParameters is used to set keepalive parameters.
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a duration of this time after having receiving nothing. (I'm pointing out the issue; my text still needs rewording)

keepalive.go Outdated
"time"
)

// KeepaliveParameters is used to set keepalive parameters.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about explaining a bit more.

KeepaliveParameters configures how the ClientConn will actively probe to notice when a connection is broken and to cause activity so intermediaries are aware the connection is still in use.

// keepaliveSkipped = 1 means skipped
keepaliveSkipped uint32 // accessed atomically
// keepalive parameters.
kp *KeepaliveParameters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe get rid of the pointer? It is probably better to inline it from a locality perspective, but it would also mean validate() wouldn't need as much thought. I think it is safe today, but you are mutating a structure that is used from multiple threads but without any synchronization; it would be trivial to accidentally break that in the future.

Note this also would make defaultKeepaliveParams unnecessary.

if !timer.Stop() {
select {
case <-timer.C:
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So note this prevents this goroutine from blocking permanently, but it may not free the timer. The Stop() may not process immediately, in which case <-timer.C doesn't have a value yet, but it will in the future.

It's probably better to either get rid of the if t.kp.Time == infinity { optimization above or check a second time here (you could have a variable, say isTimerUsed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we exit the controller function right after, wouldn't the timer be eventually gc'd. Also, since nowhere outside the function is timer.C accessed, it should be safe even if timer.C later gets a value. In fact, maybe we can get rid of the case <- timer.c: altogether?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're forgetting there is a sender of that value. The sender could keep the timer alive and prevent GC.

Now, I happen to know that the channel is buffered, and so in this specific case the sender will be able to queue an entry and then remove the timer from its data structures and then the timer could be GCed.

keepalive.go Outdated

// KeepaliveParameters is used to set keepalive parameters.
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there should be some mention of the current defaults. Here and below. Try to use language that implies the defaults may change, like "the current default is" and similar.

@MakMukhi
Copy link
Contributor Author

Thanks for the comments, Eric. On it!

type ClientParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fot/for/

Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space

type ClientParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to wrap this.

@@ -182,6 +196,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := opts.KeepaliveParams
// Validate keepalive parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is setting defaults, not validating.

@@ -80,6 +82,8 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer.
awakenKeepalive chan int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this int instead of struct{}? the value is never used.

return
}
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer timer.Stop() below this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No all exit paths of this routine have a timer running. Trying to stop and empty timer.C of an already stopped timer leads to a deadlock.
Doing this in defer would mean making sure that all exit paths of the routine reset the timer(if one is not running already). Future updates to the code might miss that and get into trouble.

t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this happen under lock? seems clearer to write:

t.mu.Lock()
dormant := len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream
t.mu.Unlock()
if dormant {
   ...
} else {
  t.controlBuf.put(p)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emptying of the awakenKeepalive channel needs to happen inside the lock so that some other thread creating a new stream sees the channel as writable if the number of streams go form 0 to 1.

}

// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there so much duplicated code here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication here is by choice to keep the structure of the routine simple so that new users can understand the logic behind easily.
To reuse the outer select we'll need to have some additional book-keeping about if the ping was sent or not.

const (
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Duration is not necessary here.

}
defer conn.Close()
// Sleep for keepalive to close the connection.
time.Sleep(4 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this should be a function of the settings above, rather than hardcoded.

there are 4 instances of this.

@@ -1104,6 +1138,63 @@ func (t *http2Client) controller() {
}
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
if t.kp.Time == infinity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this check before goroutine is created?

if len(t.activeStreams) == 1 {
select {
case t.awakenKeepalive <- struct{}{}:
t.framer.writePing(true, false, [8]byte{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to flush the ping, we will flush the headers later.

@@ -992,6 +1025,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
atomic.AddUint64(&t.activity, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAS

for {
select {
case <-timer.C:
if a := atomic.SwapUint32(&t.activity, 0); a == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAS(a, 1, 0)

Copy link
Contributor

@menghanl menghanl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@MakMukhi MakMukhi merged commit 4eaacfe into grpc:master Mar 7, 2017
andreimatei added a commit to cockroachdb/vendored that referenced this pull request Mar 21, 2017
Switch from Peter's fork of grpc to my own fork, since Peter's out of
office. My fork is https://github.com/grpc/grpc-go head + Peter's one
commit increasing the per-stream flow-control window.

The motivation of syncing to newer gRPC is to include grpc/grpc-go#993
for helping with cockroachdb/cockroach#13989 -
we will move to gRPC internal heartbeats instead of using our own
connection heartbeats.

Other dep updates:
- the Lightstep update is a single commit that seems innocuous
andreimatei added a commit to andreimatei/cockroach that referenced this pull request Mar 21, 2017
Switch from Peter's fork of grpc to my own fork, since Peter's out of
office. My fork is https://github.com/grpc/grpc-go head + Peter's one
commit increasing the per-stream flow-control window.

The motivation of syncing to newer gRPC is to include grpc/grpc-go#993
for helping with cockroachdb#13989 - we will move to gRPC internal heartbeats
instead of using our own connection heartbeats in a future cockroach
commit.

Other dep updates:
- the Lightstep update is a single commit that seems innocuous
@MakMukhi MakMukhi deleted the mmukhi_keepalive_client branch May 4, 2018 02:07
@lock lock bot locked as resolved and limited conversation to collaborators Oct 31, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants