Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point-to-point health check. Client side implementation #993

Merged
merged 23 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
Expand Down Expand Up @@ -249,6 +250,13 @@ func WithUserAgent(s string) DialOption {
}
}

// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(k *keepalive.Params) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = k
}
}

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
Expand Down
34 changes: 34 additions & 0 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package keepalive

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this needs to be another package? Can't it ben inside the grpc package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separate package looks weird to me as well. We may want to figure out alternatives.


import (
"math"
"time"
)

// Params is used to set keepalive parameters.
type Params struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about what happens when these are 0.

// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fot/for/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to wrap this.

Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space

PermitWithoutStream bool
}

// Validate is used to validate the keepalive parameters.
// Time durations initialized to 0 will be replaced with default Values.
func (p *Params) Validate() {
if p.Time == 0 {
p.Time = Infinity
}
if p.Timeout == 0 {
p.Time = TwentySec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Time/Timeout/

}
}

const (
// Infinity is the default value of keepalive time.
Infinity = time.Duration(math.MaxInt64)
// TwentySec is the default value of timeout.
TwentySec = time.Duration(20 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really poor constant to export. There is no way to change it to 30 seconds or similar in the future. One option is to export the defaults. A better option is to not export them.

Two ways to remove the export: 1) in the transport, create a zero-filled Params{} and then call Validate(), 2) move the Validate logic to transport. I have preference to 2 because it also removes the Validate export.

)
5 changes: 5 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
// The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.

type fireKeepaliveTimer struct{}

func (fireKeepaliveTimer) item() {}

type windowUpdate struct {
streamID uint32
increment uint32
Expand Down
127 changes: 104 additions & 23 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand All @@ -49,6 +50,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
Expand Down Expand Up @@ -99,6 +101,14 @@ type http2Client struct {

creds []credentials.PerRPCCredentials

// Counter to keep track of reading activity on transport.
activity uint64 // accessed atomically.
// Flag to keep track if the keepalive check was skipped because there
// were no active streams and keepalive.PermitWithoutStream was false
// keepaliveSkipped = 1 means skipped
keepaliveSkipped uint32 // accessed atomically
// keepalive parameters.
kp *keepalive.Params
statsHandler stats.Handler

mu sync.Mutex // guard the following variables
Expand Down Expand Up @@ -182,6 +192,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := defaultKeepaliveParams
if opts.KeepaliveParams != nil {
kp = opts.KeepaliveParams
kp.Validate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutates an object from the user. Should there be some copy involved?

}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
Expand Down Expand Up @@ -210,6 +225,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
}
if t.statsHandler != nil {
Expand Down Expand Up @@ -373,6 +389,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// if the number of active streams are now equal to 1, then check if keepalive
// was being skipped. If so, fire the keepalive timer
if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 {
t.controlBuf.put(fireKeepaliveTimer{})
}

// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
Expand Down Expand Up @@ -992,6 +1013,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update activity after handleSettings?

for {
frame, err := t.framer.readFrame()
atomic.AddUint64(&t.activity, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAS

if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
Expand Down Expand Up @@ -1068,36 +1090,95 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() {
timer := time.NewTimer(t.kp.Time)
if t.kp.Timeout == keepalive.Infinity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Timeout/Time/

// Prevent the timer from firing, ever.
if !timer.Stop() {
<-timer.C
}
}
isPingSent := false
keepalivePing := &ping{data: [8]byte{}}
// select toggles between control channel and writable chanel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/chanel/channel/

// We need to wait on writable channel only after having recieved
// a control message that requires controller to take an action.
// However, while waiting on either of these channels, the keepalive
// timer channel or shutdown channel might trigger. Such toggling
// take care of this case.
cchan := t.controlBuf.get()
var wchan chan int
var controlMsg item
for {
select {
case i := <-t.controlBuf.get():
case controlMsg = <-cchan:
t.controlBuf.load()
select {
case <-t.writableChan:
switch i := i.(type) {
case *windowUpdate:
t.framer.writeWindowUpdate(true, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
t.applySettings(i.ss)
} else {
t.framer.writeSettings(true, i.ss...)
// If controlMsg is of type fireKeepaliveTimer,
// then check if the keepaliveSkipped flag is still set.
if _, ok := controlMsg.(fireKeepaliveTimer); ok {
if atomic.LoadUint32(&t.keepaliveSkipped) == 1 {
// Reset the timer to 0 so that it fires.
if !timer.Stop() {
<-timer.C
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make a note that this is safe since the timer will always be Reset in case <-timer.C?

Otherwise it could deadlock. https://play.golang.org/p/4CHuWZJFwu

}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
case *ping:
t.framer.writePing(true, i.ack, i.data)
default:
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
timer.Reset(0)
}
t.writableChan <- 0
continue
case <-t.shutdownChan:
return
}
wchan = t.writableChan
cchan = nil
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continue has no effect. Ditto in next case. Remove?

case <-wchan:
switch i := controlMsg.(type) {
case *windowUpdate:
t.framer.writeWindowUpdate(true, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
t.applySettings(i.ss)
} else {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
case *ping:
t.framer.writePing(true, i.ack, i.data)
default:
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
}
wchan <- 0
wchan = nil
cchan = t.controlBuf.get()
continue
case <-timer.C:
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if !t.kp.PermitWithoutStream && ns < 1 {
timer.Reset(t.kp.Time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Won't this cause the timer to continue firing on an idle channel? It also makes the various states hard to follow. Could this just be reset to Inifinity?

isPingSent = false
// set flag that signifyies keepalive was skipped
atomic.StoreUint32(&t.keepaliveSkipped, 1)
continue
}
// reset the keepaliveSkipped flag
atomic.StoreUint32(&t.keepaliveSkipped, 0)
// Get the activity counter value and reset it.
a := atomic.SwapUint64(&t.activity, 0)
if a > 0 {
timer.Reset(t.kp.Time)
isPingSent = false
continue
}
if !isPingSent {
// Send ping.
t.controlBuf.put(keepalivePing)
isPingSent = true
timer.Reset(t.kp.Timeout)
continue
}
t.Close()
case <-t.shutdownChan:
return
}
Expand Down
9 changes: 9 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
Expand Down Expand Up @@ -382,10 +383,18 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams *keepalive.Params
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}

// default values for keepalive parameters.
var defaultKeepaliveParams = &keepalive.Params{
Time: keepalive.Infinity, // default to infinite.
Timeout: keepalive.TwentySec,
}

// TargetInfo contains the information of the target such as network address and metadata.
type TargetInfo struct {
Addr string
Expand Down
Loading