Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to Client RPC calls #11500

Merged
merged 24 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
Expand Down
18 changes: 12 additions & 6 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,26 @@ TRY:
}

// Move off to another server, and see if we can retry.
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
manager.NotifyFailedServer(server)

// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
info, _ := args.(structs.RPCInfo)
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry {
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
return rpcErr
}

c.logger.Warn("Retrying RPC to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)

// We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
Expand Down
72 changes: 64 additions & 8 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}

Expand All @@ -72,7 +73,7 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
}

// Apply config to copied fields because many tests only set the old
//values.
// values.
config.ACLResolverSettings.ACLsEnabled = config.ACLsEnabled
config.ACLResolverSettings.NodeName = config.NodeName
config.ACLResolverSettings.Datacenter = config.Datacenter
Expand Down Expand Up @@ -521,13 +522,16 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
resolver.Register(builder)

connPool := &pool.ConnPool{
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}

return Deps{
Expand Down Expand Up @@ -853,3 +857,55 @@ func TestClient_ShortReconnectTimeout(t *testing.T) {
50*time.Millisecond,
"The client node was not reaped within the alotted time")
}

type waiter struct {
duration time.Duration
}

func (w *waiter) Wait(struct{}, *struct{}) error {
time.Sleep(w.duration)
return nil
}

func TestClient_RPC_Timeout(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

timeout := 10 * time.Millisecond

t.Parallel()

_, s1 := testServerWithConfig(t)

_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = timeout
})

joinLAN(t, c1, s1)
retry.Run(t, func(r *retry.R) {
var out struct{}
if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
r.Fatalf("err: %v", err)
}
})

require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: timeout}))

// Requests with QueryOptions have a default timeout of RPCHoldTimeout
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")

// Blocking requests have a longer timeout so we expect no error.
out = struct{}{}
require.NoError(t, c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
}
4 changes: 4 additions & 0 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,10 @@ func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime
return false, nil
}

func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}

func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func testServerConfig(t *testing.T) (string, *Config) {

// TODO (slackpad) - We should be able to run all tests w/o this, but it
// looks like several depend on it.
config.RPCHoldTimeout = 5 * time.Second
config.RPCHoldTimeout = 10 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary for tests to pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't recall exactly (it's been a few months) but according to the commit message I left in cb4de4d, the 5-second RPCHoldTimeout was causing some timeouts when running many tests concurrently.


config.ConnectEnabled = true
config.CAConfig = &structs.CAConfiguration{
Expand Down
2 changes: 1 addition & 1 deletion agent/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *HTTPHandlers) KVSGet(resp http.ResponseWriter, req *http.Request, args

// Make the RPC
var out structs.IndexedDirEntries
if err := s.agent.RPC(method, &args, &out); err != nil {
if err := s.agent.RPC(method, args, &out); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. After applying the rpcHoldTimeout to all client RPC calls, I noticed one type of blocking query (KVS.Get / KVS.List) wasn't having its timeout set properly based on the query parameters. Removing an ampersand from the args parameter fixed this issue (92525c4). This probably also affects the existing RPC retry logic for this RPC method so could possibly be extracted as a separate fix PR.

This seems like a really weird class of bug as the behaviour shouldn't depend on args being mutated during RPC. @wjordan was this encountered during your own testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as precaution I'd like to revert this one-liner and file another issue regarding KVS in case we ever need to debug this PR in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I discovered this issue in my own testing on a production cluster, without this change I was seeing timeouts on blocking queries to these methods. I can't say I quite understand why this change fixed the issue, only that it did seem to fix it for my particular setup.

Moving the fix to a separate PR sounds like a good precaution in any case.

return nil, err
}
setMeta(resp, &out.QueryMeta)
Expand Down
54 changes: 49 additions & 5 deletions agent/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type muxSession interface {

// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream net.Conn
stream *TimeoutConn
codec rpc.ClientCodec
}

Expand All @@ -56,6 +56,36 @@ type Conn struct {
clientLock sync.Mutex
}

// TimeoutConn wraps net.Conn with a read timeout.
// When set, FirstReadTimeout only applies to the very next Read.
// DefaultTimeout is used for any other Read.
type TimeoutConn struct {
net.Conn
DefaultTimeout time.Duration
FirstReadTimeout time.Duration
}

func (c *TimeoutConn) Read(b []byte) (int, error) {
timeout := c.DefaultTimeout
// Apply timeout to first read then zero it out
if c.FirstReadTimeout > 0 {
timeout = c.FirstReadTimeout
c.FirstReadTimeout = 0
}
var deadline time.Time
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
if err := c.Conn.SetReadDeadline(deadline); err != nil {
return 0, err
}
return c.Conn.Read(b)
}

func (c *TimeoutConn) Write(b []byte) (int, error) {
return c.Conn.Write(b)
}

func (c *Conn) Close() error {
return c.session.Close()
}
Expand All @@ -79,12 +109,14 @@ func (c *Conn) getClient() (*StreamClient, error) {
return nil, err
}

timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}

// Create the RPC client
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)

// Return a new stream client
sc := &StreamClient{
stream: stream,
stream: timeoutStream,
codec: codec,
}
return sc, nil
Expand All @@ -101,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) {

// If this is a Yamux stream, shrink the internal buffers so that
// we can GC the idle memory
if ys, ok := client.stream.(*yamux.Stream); ok {
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
ys.Shrink()
}
}
Expand Down Expand Up @@ -133,6 +165,13 @@ type ConnPool struct {
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
Logger *log.Logger

// The default timeout for stream reads/writes
Timeout time.Duration

Comment on lines +168 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand the difference between ConnPool.Timeout vs the timeout args that have been added to all the rpc methods?

I'm wondering if it's possible to achieve the same behaviour without adding an arg (which is defaulted to 0 in most cases) to an already long list of arguments to rpc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnPool.Timeout ('The default timeout for stream reads/writes') is set from RPCHoldTimeout, and represents the default timeout for any stream read/write when an explicit timeout for a specific read/write call is not provided. RPCHoldTimeout represents an existing duration the server is tuned to expect round-trips to be shorter than, so it seemed like a reasonable timeout to use for most reads/writes going through this connection pool.

The ConnPool.rpc method sets a specific timeout for the first stream read after the query is sent. This is because the expected wait time for a (potentially blocking) query response can be much longer than the default round-trip timeout configuration.

I'm wondering if it's possible to achieve the same behaviour without adding an arg (which is defaulted to 0 in most cases) to an already long list of arguments to rpc.

Thanks for the suggestion. Yes, Dropping the timeout argument by moving the RPC-timeout calculation from Client.RPC to ConnPool.rpc requires MaxQueryTime and DefaultQueryTime to be passed to ConnPool, but I think it does end up a bit cleaner that way.

// Used for calculating timeouts on RPC requests
MaxQueryTime time.Duration
DefaultQueryTime time.Duration

// The maximum time to keep a connection open
MaxTime time.Duration

Expand Down Expand Up @@ -325,7 +364,7 @@ func (p *ConnPool) dial(
tlsRPCType RPCType,
) (net.Conn, HalfCloser, error) {
// Try to dial the conn
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -590,6 +629,11 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
return fmt.Errorf("rpc error getting client: %w", err)
}

// Use the zero value if the request doesn't implement RPCInfo
if info, ok := args.(structs.RPCInfo); ok {
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
}

// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
}

pool := &pool.ConnPool{
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Timeout: config.RPCHoldTimeout,
MaxQueryTime: config.MaxQueryTime,
DefaultQueryTime: config.DefaultQueryTime,
}
if config.ServerMode {
pool.MaxTime = 2 * time.Minute
Expand Down
25 changes: 17 additions & 8 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"

"github.com/golang/protobuf/proto"
ptypes "github.com/golang/protobuf/ptypes"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/coordinate"
"github.com/mitchellh/hashstructure"

"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"

ptypes "github.com/golang/protobuf/ptypes"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -217,6 +215,7 @@ type RPCInfo interface {
TokenSecret() string
SetTokenSecret(string)
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error)
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration
}

// QueryOptions is used to specify various flags for read queries
Expand Down Expand Up @@ -315,18 +314,24 @@ func (q *QueryOptions) SetTokenSecret(s string) {
q.Token = s
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
// Match logic in Server.blockingQuery.
if q.MinQueryIndex > 0 {
if q.MaxQueryTime > maxQueryTime {
q.MaxQueryTime = maxQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = defaultQueryTime
}
// Timeout after maximum jitter has elapsed.
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)

return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout), nil
return q.MaxQueryTime + rpcHoldTimeout
}
return time.Since(start) > rpcHoldTimeout, nil
return rpcHoldTimeout
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need all these types to implement HasTimedOut if it's the same everywhere? It seems like we could have one version of this (not attached to a type) that gets a structs.RPCInfo passed in and we could call Timeout on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll see if I can shrink the RPCInfo interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time refactoring locally but think it's too big of a change for this PR. Will raise an issue separately; the whole machinery with maxQueryTime and defaultQueryTime is confusing and only relevant for blocking queries

return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}

type WriteRequest struct {
Expand All @@ -353,7 +358,11 @@ func (w *WriteRequest) SetTokenSecret(s string) {
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}

func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
}

type QueryBackend int
Expand Down
6 changes: 5 additions & 1 deletion proto/pbautoconf/auto_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) {
}

func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}

func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
}
Loading