Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: canretry method
Browse files Browse the repository at this point in the history
JadhavPoonam committed Feb 1, 2023

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 42aece2 commit 3f5cae8
Showing 3 changed files with 27 additions and 39 deletions.
12 changes: 11 additions & 1 deletion agent/consul/client.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import (
"golang.org/x/time/rate"

"github.com/hashicorp/consul/acl"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
@@ -296,7 +297,16 @@ TRY:

// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
info, _ := args.(structs.RPCInfo)
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry {
retryableMessages := []error{
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit,

// These rate limit errors are returned before the handler is called, so are
// safe to retry.
rpcRate.ErrRetryElsewhere,
}

if retry := canRetry(info, rpcErr, firstCheck, c.config, retryableMessages); !retry {
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
47 changes: 10 additions & 37 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
@@ -556,25 +556,8 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
return c.lr.Read(b)
}

func hasRetryableError(err error) bool {
retryableMessages := []error{
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit,

// These rate limit errors are returned before the handler is called, so are
// safe to retry.
rate.ErrRetryElsewhere,
}
for _, m := range retryableMessages {
if err != nil && strings.Contains(err.Error(), m.Error()) {
return true
}
}
return false
}

// canRetry returns true if the request and error indicate that a retry is safe.
func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) bool {
func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool {
if info != nil {
timedOut, timeoutError := info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime)
if timeoutError != nil {
@@ -596,21 +579,10 @@ func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config)
return true
}

// retryableMessages := []error{
// // If we are chunking and it doesn't seem to have completed, try again.
// ErrChunkingResubmit,

// // These rate limit errors are returned before the handler is called, so are
// // safe to retry.
// rate.ErrRetryElsewhere,
// }
// for _, m := range retryableMessages {
// if err != nil && strings.Contains(err.Error(), m.Error()) {
// return true
// }
// }
if hasRetryableError(err) {
return true
for _, m := range retryableMessages {
if err != nil && strings.Contains(err.Error(), m.Error()) {
return true
}
}

// Reads are safe to retry for stream errors, such as if a server was
@@ -765,12 +737,13 @@ CHECK_LEADER:
return true, nil
}
}
// defer to caller to retry
if !errors.Is(rpcErr, rate.ErrRetryElsewhere) {
return true, rpcErr

retryableMessages := []error{
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit,
}

if retry := canRetry(info, rpcErr, firstCheck, s.config); retry {
if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry {
// Gate the request until there is a leader
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
select {
7 changes: 6 additions & 1 deletion agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/rate"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state"
agent_grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/pool"
@@ -1287,12 +1288,16 @@ func TestCanRetry(t *testing.T) {
config := DefaultConfig()
now := time.Now()
config.RPCHoldTimeout = 7 * time.Second
retryableMessages := []error{
ErrChunkingResubmit,
rpcRate.ErrRetryElsewhere,
}
run := func(t *testing.T, tc testCase) {
timeOutValue := tc.timeout
if timeOutValue.IsZero() {
timeOutValue = now
}
require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config))
require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config, retryableMessages))
}

var testCases = []testCase{

0 comments on commit 3f5cae8

Please sign in to comment.