-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add timeout to Client RPC calls #11500
Changes from 20 commits
28bb096
cb4de4d
4f54651
244eedb
4103d14
c4ad24f
d6e2cbb
2f2857e
92525c4
b80ae67
40c9b40
dc7683d
9534200
0fc54d8
db227b0
c8897b2
12f57ec
6db287f
1762173
a12c517
ddaa01e
dd3327b
4922ef0
78c042a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,7 +72,7 @@ func (s *HTTPHandlers) KVSGet(resp http.ResponseWriter, req *http.Request, args | |
|
||
// Make the RPC | ||
var out structs.IndexedDirEntries | ||
if err := s.agent.RPC(method, &args, &out); err != nil { | ||
if err := s.agent.RPC(method, args, &out); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This seems like a really weird class of bug as the behaviour shouldn't depend on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just as precaution I'd like to revert this one-liner and file another issue regarding KVS in case we ever need to debug this PR in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I discovered this issue in my own testing on a production cluster, without this change I was seeing timeouts on blocking queries to these methods. I can't say I quite understand why this change fixed the issue, only that it did seem to fix it for my particular setup. Moving the fix to a separate PR sounds like a good precaution in any case. |
||
return nil, err | ||
} | ||
setMeta(resp, &out.QueryMeta) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ type muxSession interface { | |
|
||
// streamClient is used to wrap a stream with an RPC client | ||
type StreamClient struct { | ||
stream net.Conn | ||
stream *TimeoutConn | ||
codec rpc.ClientCodec | ||
} | ||
|
||
|
@@ -56,6 +56,36 @@ type Conn struct { | |
clientLock sync.Mutex | ||
} | ||
|
||
// TimeoutConn wraps net.Conn with a read timeout. | ||
// When set, FirstReadTimeout only applies to the very next Read. | ||
// DefaultTimeout is used for any other Read. | ||
type TimeoutConn struct { | ||
net.Conn | ||
DefaultTimeout time.Duration | ||
FirstReadTimeout time.Duration | ||
} | ||
|
||
func (c *TimeoutConn) Read(b []byte) (int, error) { | ||
timeout := c.DefaultTimeout | ||
// Apply timeout to first read then zero it out | ||
if c.FirstReadTimeout > 0 { | ||
timeout = c.FirstReadTimeout | ||
c.FirstReadTimeout = 0 | ||
} | ||
var deadline time.Time | ||
if timeout > 0 { | ||
deadline = time.Now().Add(timeout) | ||
} | ||
if err := c.Conn.SetReadDeadline(deadline); err != nil { | ||
return 0, err | ||
} | ||
return c.Conn.Read(b) | ||
} | ||
|
||
func (c *TimeoutConn) Write(b []byte) (int, error) { | ||
return c.Conn.Write(b) | ||
} | ||
|
||
func (c *Conn) Close() error { | ||
return c.session.Close() | ||
} | ||
|
@@ -79,12 +109,14 @@ func (c *Conn) getClient() (*StreamClient, error) { | |
return nil, err | ||
} | ||
|
||
timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout} | ||
|
||
// Create the RPC client | ||
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle) | ||
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle) | ||
|
||
// Return a new stream client | ||
sc := &StreamClient{ | ||
stream: stream, | ||
stream: timeoutStream, | ||
codec: codec, | ||
} | ||
return sc, nil | ||
|
@@ -101,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) { | |
|
||
// If this is a Yamux stream, shrink the internal buffers so that | ||
// we can GC the idle memory | ||
if ys, ok := client.stream.(*yamux.Stream); ok { | ||
if ys, ok := client.stream.Conn.(*yamux.Stream); ok { | ||
ys.Shrink() | ||
} | ||
} | ||
|
@@ -133,6 +165,13 @@ type ConnPool struct { | |
// TODO: consider refactoring to accept a full yamux.Config instead of a logger | ||
Logger *log.Logger | ||
|
||
// The default timeout for stream reads/writes | ||
Timeout time.Duration | ||
|
||
Comment on lines
+168
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you help me understand the difference between I'm wondering if it's possible to achieve the same behaviour without adding an arg (which is defaulted to 0 in most cases) to an already long list of arguments to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The
Thanks for the suggestion. Yes, Dropping the |
||
// Used for calculating timeouts on RPC requests | ||
MaxQueryTime time.Duration | ||
DefaultQueryTime time.Duration | ||
|
||
// The maximum time to keep a connection open | ||
MaxTime time.Duration | ||
|
||
|
@@ -325,7 +364,7 @@ func (p *ConnPool) dial( | |
tlsRPCType RPCType, | ||
) (net.Conn, HalfCloser, error) { | ||
// Try to dial the conn | ||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout} | ||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout} | ||
conn, err := d.Dial("tcp", addr.String()) | ||
if err != nil { | ||
return nil, nil, err | ||
|
@@ -590,6 +629,11 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, | |
return fmt.Errorf("rpc error getting client: %w", err) | ||
} | ||
|
||
// Use the zero value if the request doesn't implement RPCInfo | ||
if info, ok := args.(structs.RPCInfo); ok { | ||
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime) | ||
} | ||
|
||
// Make the RPC call | ||
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply) | ||
if err != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,12 @@ import ( | |
"github.com/golang/protobuf/ptypes/timestamp" | ||
|
||
"github.com/golang/protobuf/proto" | ||
ptypes "github.com/golang/protobuf/ptypes" | ||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec" | ||
"github.com/hashicorp/go-multierror" | ||
"github.com/hashicorp/serf/coordinate" | ||
"github.com/mitchellh/hashstructure" | ||
|
||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec" | ||
|
||
ptypes "github.com/golang/protobuf/ptypes" | ||
|
||
"github.com/hashicorp/consul/acl" | ||
"github.com/hashicorp/consul/agent/cache" | ||
"github.com/hashicorp/consul/api" | ||
|
@@ -217,6 +215,7 @@ type RPCInfo interface { | |
TokenSecret() string | ||
SetTokenSecret(string) | ||
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) | ||
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration | ||
} | ||
|
||
// QueryOptions is used to specify various flags for read queries | ||
|
@@ -315,18 +314,24 @@ func (q *QueryOptions) SetTokenSecret(s string) { | |
q.Token = s | ||
} | ||
|
||
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { | ||
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { | ||
// Match logic in Server.blockingQuery. | ||
if q.MinQueryIndex > 0 { | ||
if q.MaxQueryTime > maxQueryTime { | ||
q.MaxQueryTime = maxQueryTime | ||
} else if q.MaxQueryTime <= 0 { | ||
q.MaxQueryTime = defaultQueryTime | ||
} | ||
// Timeout after maximum jitter has elapsed. | ||
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction) | ||
|
||
return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout), nil | ||
return q.MaxQueryTime + rpcHoldTimeout | ||
} | ||
return time.Since(start) > rpcHoldTimeout, nil | ||
return rpcHoldTimeout | ||
} | ||
|
||
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need all these types to implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I'll see if I can shrink the RPCInfo interface There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent some time refactoring locally but think it's too big of a change for this PR. Will raise an issue separately; the whole machinery with maxQueryTime and defaultQueryTime is confusing and only relevant for blocking queries |
||
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil | ||
} | ||
|
||
type WriteRequest struct { | ||
|
@@ -353,7 +358,11 @@ func (w *WriteRequest) SetTokenSecret(s string) { | |
} | ||
|
||
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { | ||
return time.Since(start) > rpcHoldTimeout, nil | ||
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil | ||
} | ||
|
||
func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { | ||
return rpcHoldTimeout | ||
} | ||
|
||
type QueryBackend int | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary for tests to pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I don't recall exactly (it's been a few months) but according to the commit message I left in cb4de4d, the 5-second
RPCHoldTimeout
was causing some timeouts when running many tests concurrently.