From 3f5cae893976494c19ca005cb7613fd29d6b070c Mon Sep 17 00:00:00 2001 From: Poonam Jadhav Date: Wed, 1 Feb 2023 11:10:33 -0500 Subject: [PATCH] refactor: canretry method --- agent/consul/client.go | 12 +++++++++- agent/consul/rpc.go | 47 +++++++++------------------------------- agent/consul/rpc_test.go | 7 +++++- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/agent/consul/client.go b/agent/consul/client.go index 94f806a849442..baf87bbfd9f9f 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -16,6 +16,7 @@ import ( "golang.org/x/time/rate" "github.com/hashicorp/consul/acl" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -296,7 +297,16 @@ TRY: // Use the zero value for RPCInfo if the request doesn't implement RPCInfo info, _ := args.(structs.RPCInfo) - if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry { + retryableMessages := []error{ + // If we are chunking and it doesn't seem to have completed, try again. + ErrChunkingResubmit, + + // These rate limit errors are returned before the handler is called, so are + // safe to retry. + rpcRate.ErrRetryElsewhere, + } + + if retry := canRetry(info, rpcErr, firstCheck, c.config, retryableMessages); !retry { c.logger.Error("RPC failed to server", "method", method, "server", server.Addr, diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 1055904c5b4ef..5e5cf7c3c6de6 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -556,25 +556,8 @@ func (c *limitedConn) Read(b []byte) (n int, err error) { return c.lr.Read(b) } -func hasRetryableError(err error) bool { - retryableMessages := []error{ - // If we are chunking and it doesn't seem to have completed, try again. - ErrChunkingResubmit, - - // These rate limit errors are returned before the handler is called, so are - // safe to retry. - rate.ErrRetryElsewhere, - } - for _, m := range retryableMessages { - if err != nil && strings.Contains(err.Error(), m.Error()) { - return true - } - } - return false -} - // canRetry returns true if the request and error indicate that a retry is safe. -func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) bool { +func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool { if info != nil { timedOut, timeoutError := info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime) if timeoutError != nil { @@ -596,21 +579,10 @@ func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) return true } - // retryableMessages := []error{ - // // If we are chunking and it doesn't seem to have completed, try again. - // ErrChunkingResubmit, - - // // These rate limit errors are returned before the handler is called, so are - // // safe to retry. - // rate.ErrRetryElsewhere, - // } - // for _, m := range retryableMessages { - // if err != nil && strings.Contains(err.Error(), m.Error()) { - // return true - // } - // } - if hasRetryableError(err) { - return true + for _, m := range retryableMessages { + if err != nil && strings.Contains(err.Error(), m.Error()) { + return true + } } // Reads are safe to retry for stream errors, such as if a server was @@ -765,12 +737,13 @@ CHECK_LEADER: return true, nil } } - // defer to caller to retry - if !errors.Is(rpcErr, rate.ErrRetryElsewhere) { - return true, rpcErr + + retryableMessages := []error{ + // If we are chunking and it doesn't seem to have completed, try again. + ErrChunkingResubmit, } - if retry := canRetry(info, rpcErr, firstCheck, s.config); retry { + if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry { // Gate the request until there is a leader jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) select { diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 42e2ae4f738e9..18171725c536d 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -31,6 +31,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/rate" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" agent_grpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/pool" @@ -1287,12 +1288,16 @@ func TestCanRetry(t *testing.T) { config := DefaultConfig() now := time.Now() config.RPCHoldTimeout = 7 * time.Second + retryableMessages := []error{ + ErrChunkingResubmit, + rpcRate.ErrRetryElsewhere, + } run := func(t *testing.T, tc testCase) { timeOutValue := tc.timeout if timeOutValue.IsZero() { timeOutValue = now } - require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config)) + require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config, retryableMessages)) } var testCases = []testCase{