-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Point-to-point health check. Client side implementation #993
Changes from 9 commits
e58450b
a417b82
93111e4
058907d
45907c2
97fb58a
901cdf6
eeb6f5b
49d2a88
b2448f6
e78a1f3
5fcb58f
0ba0eaf
c8983ba
0bdf059
336b4ea
bc3cca9
3ad7810
e328ede
f220619
25f14b7
9d23576
661dbbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package keepalive | ||
|
||
import ( | ||
"math" | ||
"sync" | ||
"time" | ||
) | ||
|
||
// Params is used to set keepalive parameters. | ||
type Params struct { | ||
// After a duration of this time the client pings the server to see if the transport is still alive. | ||
Time time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to think about what happens when these are 0. |
||
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/fot/for/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be nice to wrap this. |
||
Timeout time.Duration | ||
//If true, client runs keepalive checks even with no active RPCs. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing space |
||
PermitWithoutStream bool | ||
} | ||
|
||
// DefaultParams contains default values for keepalive parameters. | ||
var DefaultParams = Params{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider moving to transport. |
||
Time: time.Duration(math.MaxInt64), // default to infinite. | ||
Timeout: time.Duration(20 * time.Second), | ||
} | ||
|
||
// mu is a mutex to protect Enabled variable. | ||
var mu = sync.Mutex{} | ||
|
||
// enable is a knob used to turn keepalive on or off. | ||
var enable = false | ||
|
||
// Enabled exposes the value of enable variable. | ||
func Enabled() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems unnecessary. I don't know a case where WithKeepaliveParams wouldn't work. |
||
mu.Lock() | ||
defer mu.Unlock() | ||
return enable | ||
} | ||
|
||
// Enable can be called to enable keepalives. | ||
func Enable() { | ||
mu.Lock() | ||
defer mu.Unlock() | ||
enable = true | ||
} | ||
|
||
// Disable can be called to disable keepalive. | ||
func Disable() { | ||
mu.Lock() | ||
defer mu.Unlock() | ||
enable = false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ import ( | |
"net" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
|
@@ -49,6 +50,7 @@ import ( | |
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/grpclog" | ||
"google.golang.org/grpc/keepalive" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/peer" | ||
"google.golang.org/grpc/stats" | ||
|
@@ -98,6 +100,11 @@ type http2Client struct { | |
|
||
creds []credentials.PerRPCCredentials | ||
|
||
// Counter to keep track of activity(reading and writing on transport). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should only be used for reading. Writing does not prevent the need for keepalive ping. |
||
activity uint64 // accessed atomically. | ||
// keepalive parameters. | ||
kp keepalive.Params | ||
|
||
mu sync.Mutex // guard the following variables | ||
state transportState // the state of underlying connection | ||
activeStreams map[uint32]*Stream | ||
|
@@ -179,6 +186,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
if opts.UserAgent != "" { | ||
ua = opts.UserAgent + " " + ua | ||
} | ||
kp := keepalive.DefaultParams | ||
if opts.KeepaliveParams != (keepalive.Params{}) { | ||
kp = opts.KeepaliveParams | ||
} | ||
var buf bytes.Buffer | ||
t := &http2Client{ | ||
target: addr.Addr, | ||
|
@@ -206,6 +217,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |
creds: opts.PerRPCCredentials, | ||
maxStreams: math.MaxInt32, | ||
streamSendQuota: defaultWindowSize, | ||
kp: kp, | ||
} | ||
// Start the reader goroutine for incoming message. Each transport has | ||
// a dedicated goroutine which reads HTTP2 frame from network. Then it | ||
|
@@ -690,6 +702,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { | |
break | ||
} | ||
} | ||
atomic.AddUint64(&t.activity, 1) | ||
if !opts.Last { | ||
return nil | ||
} | ||
|
@@ -976,6 +989,7 @@ func (t *http2Client) reader() { | |
// loop to keep reading incoming messages on this transport. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update activity after handleSettings? |
||
for { | ||
frame, err := t.framer.readFrame() | ||
atomic.AddUint64(&t.activity, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CAS |
||
if err != nil { | ||
// Abort an active stream if the http2.Framer returns a | ||
// http2.StreamError. This can happen only if the server's response | ||
|
@@ -1052,6 +1066,15 @@ func (t *http2Client) applySettings(ss []http2.Setting) { | |
// controller running in a separate goroutine takes charge of sending control | ||
// frames (e.g., window update, reset stream, setting, etc.) to the server. | ||
func (t *http2Client) controller() { | ||
timer := time.NewTimer(t.kp.Time) | ||
if !keepalive.Enabled() { | ||
// Prevent the timer from firing, ever. | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
} | ||
isPingSent := false | ||
keepalivePing := &ping{data: [8]byte{}} | ||
for { | ||
select { | ||
case i := <-t.controlBuf.get(): | ||
|
@@ -1082,6 +1105,26 @@ func (t *http2Client) controller() { | |
case <-t.shutdownChan: | ||
return | ||
} | ||
case <-timer.C: | ||
t.mu.Lock() | ||
ns := len(t.activeStreams) | ||
t.mu.Unlock() | ||
// Get the activity counter value and reset it. | ||
a := atomic.SwapUint64(&t.activity, 0) | ||
if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't seem to follow this part of the spec:
|
||
timer.Reset(t.kp.Time) | ||
isPingSent = false | ||
} else { | ||
if !isPingSent { | ||
// Send ping. | ||
t.controlBuf.put(keepalivePing) | ||
isPingSent = true | ||
timer.Reset(t.kp.Timeout) | ||
} else { | ||
t.Close() | ||
continue | ||
} | ||
} | ||
case <-t.shutdownChan: | ||
return | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,7 @@ import ( | |
"golang.org/x/net/http2" | ||
"golang.org/x/net/http2/hpack" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/keepalive" | ||
) | ||
|
||
type server struct { | ||
|
@@ -251,6 +252,10 @@ func (s *server) stop() { | |
} | ||
|
||
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) { | ||
return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{}) | ||
} | ||
|
||
func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) { | ||
server := &server{startedErr: make(chan error, 1)} | ||
go server.start(t, port, maxStreams, ht) | ||
server.wait(t, 2*time.Second) | ||
|
@@ -262,13 +267,140 @@ func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, Client | |
target := TargetInfo{ | ||
Addr: addr, | ||
} | ||
ct, connErr = NewClientTransport(context.Background(), target, ConnectOptions{}) | ||
ct, connErr = NewClientTransport(context.Background(), target, copts) | ||
if connErr != nil { | ||
t.Fatalf("failed to create transport: %v", connErr) | ||
} | ||
return server, ct | ||
} | ||
|
||
func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport { | ||
lis, err := net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
t.Fatalf("Failed to listen: %v", err) | ||
} | ||
// Launch a non responsive server. | ||
go func() { | ||
defer lis.Close() | ||
conn, err := lis.Accept() | ||
if err != nil { | ||
t.Errorf("Error at server-side while accepting: %v", err) | ||
close(done) | ||
return | ||
} | ||
done <- conn | ||
}() | ||
tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) | ||
if err != nil { | ||
t.Fatalf("Failed to dial: %v", err) | ||
} | ||
return tr | ||
} | ||
|
||
func TestKeepaliveClientClosesIdleTransport(t *testing.T) { | ||
keepalive.Enable() | ||
defer keepalive.Disable() | ||
done := make(chan net.Conn, 1) | ||
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ | ||
Time: 2 * time.Second, // Keepalive time = 2 sec. | ||
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. | ||
PermitWithoutStream: true, // Run keepalive even with no RPCs. | ||
}}, done) | ||
defer tr.Close() | ||
conn, ok := <-done | ||
if !ok { | ||
t.Fatalf("Server didn't return connection object") | ||
} | ||
defer conn.Close() | ||
// Sleep for keepalive to close the connection. | ||
time.Sleep(4 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps this should be a function of the settings above, rather than hardcoded. there are 4 instances of this. |
||
// Assert that the connection was closed. | ||
ct := tr.(*http2Client) | ||
ct.mu.Lock() | ||
defer ct.mu.Unlock() | ||
if ct.state == reachable { | ||
t.Fatalf("Test Failed: Expected client transport to have closed.") | ||
} | ||
} | ||
|
||
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { | ||
keepalive.Enable() | ||
defer keepalive.Disable() | ||
done := make(chan net.Conn, 1) | ||
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ | ||
Time: 2 * time.Second, // Keepalive time = 2 sec. | ||
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. | ||
PermitWithoutStream: false, // Don't run keepalive even with no RPCs. | ||
}}, done) | ||
defer tr.Close() | ||
conn, ok := <-done | ||
if !ok { | ||
t.Fatalf("server didn't reutrn connection object") | ||
} | ||
defer conn.Close() | ||
// Give keepalive some time. | ||
time.Sleep(4 * time.Second) | ||
// Assert that connections is still healthy. | ||
ct := tr.(*http2Client) | ||
ct.mu.Lock() | ||
defer ct.mu.Unlock() | ||
if ct.state != reachable { | ||
t.Fatalf("Test failed: Expected client transport to be healthy.") | ||
} | ||
} | ||
|
||
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { | ||
keepalive.Enable() | ||
defer keepalive.Disable() | ||
done := make(chan net.Conn, 1) | ||
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ | ||
Time: 2 * time.Second, // Keepalive time = 2 sec. | ||
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. | ||
PermitWithoutStream: false, // Don't run keepalive even with no RPCs. | ||
}}, done) | ||
defer tr.Close() | ||
conn, ok := <-done | ||
if !ok { | ||
t.Fatalf("Server didn't return connection object") | ||
} | ||
defer conn.Close() | ||
// Create a stream. | ||
_, err := tr.NewStream(context.Background(), &CallHdr{}) | ||
if err != nil { | ||
t.Fatalf("Failed to create a new stream: %v", err) | ||
} | ||
// Give keepalive some time. | ||
time.Sleep(4 * time.Second) | ||
// Assert that transport was closed. | ||
ct := tr.(*http2Client) | ||
ct.mu.Lock() | ||
defer ct.mu.Unlock() | ||
if ct.state == reachable { | ||
t.Fatalf("Test failed: Expected client transport to have closed.") | ||
} | ||
} | ||
|
||
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { | ||
keepalive.Enable() | ||
defer keepalive.Disable() | ||
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.Params{ | ||
Time: 2 * time.Second, // Keepalive time = 2 sec. | ||
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. | ||
PermitWithoutStream: true, // Don't run keepalive even with no RPCs. | ||
}}) | ||
defer s.stop() | ||
defer tr.Close() | ||
// Give keep alive some time. | ||
time.Sleep(4 * time.Second) | ||
// Assert that transport is healthy. | ||
ct := tr.(*http2Client) | ||
ct.mu.Lock() | ||
defer ct.mu.Unlock() | ||
if ct.state != reachable { | ||
t.Fatalf("Test failed: Expected client transport to be healthy.") | ||
} | ||
} | ||
|
||
func TestClientSendAndReceive(t *testing.T) { | ||
server, ct := setUp(t, 0, math.MaxUint32, normal) | ||
callHdr := &CallHdr{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this needs to be another package? Can't it ben inside the grpc package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport package depends on this. grpc package depends on transport. Putting this in grpc will create a circular dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separate package looks weird to me as well. We may want to figure out alternatives.