diff --git a/pkg/tm/global_transaction.go b/pkg/tm/global_transaction.go index 71bc3a795..733a63031 100644 --- a/pkg/tm/global_transaction.go +++ b/pkg/tm/global_transaction.go @@ -28,6 +28,7 @@ import ( "github.com/seata/seata-go/pkg/common/log" "github.com/seata/seata-go/pkg/protocol/message" "github.com/seata/seata-go/pkg/remoting/getty" + "github.com/seata/seata-go/pkg/util/backoff" ) type GlobalTransaction struct { @@ -96,35 +97,35 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa return errors.New("Commit xid should not be empty") } - // todo: replace retry with config - var ( - err error - res interface{} - // todo retry and retryInterval should read from config - retry = 10 - retryInterval = 200 * time.Millisecond - req = message.GlobalCommitRequest{ - AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ - Xid: gtr.Xid, - }, - } - ) - for ; retry > 0; retry-- { - res, err = getty.GetGettyRemotingClient().SendSyncRequest(req) - if err != nil { - log.Errorf("GlobalCommitRequest error, xid %s, error %v", gtr.Xid, err) - time.Sleep(retryInterval) - } else { + bf := backoff.New(ctx, backoff.Config{ + MaxRetries: 10, + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 200 * time.Millisecond, + }) + + var req = message.GlobalCommitRequest{ + AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid}, + } + var res interface{} + var err error + for bf.Ongoing() { + if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == nil { break } + log.Warnf("send global commit request failed, xid %s, error %v", gtr.Xid, err) + bf.Wait() } - if err != nil { - log.Infof("send global commit request failed, xid %s, error %v", gtr.Xid, err) - return err + + if bf.Err() != nil { + lastErr := errors.Wrap(err, bf.Err().Error()) + log.Warnf("send global commit request failed, xid %s, error %v", gtr.Xid, lastErr) + return lastErr } + log.Infof("send global commit request success, xid %s", gtr.Xid) gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus UnbindXid(ctx) + return nil } @@ -138,35 +139,36 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran return errors.New("Rollback xid should not be empty") } - // todo: replace retry with config - var ( - err error - res interface{} - // todo retry and retryInterval should read from config - retry = 10 - retryInterval = 200 * time.Millisecond - req = message.GlobalRollbackRequest{ - AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ - Xid: gtr.Xid, - }, - } - ) - for ; retry > 0; retry-- { - res, err = getty.GetGettyRemotingClient().SendSyncRequest(req) - if err != nil { - log.Errorf("GlobalRollbackRequest error, xid %s, error %v", gtr.Xid, err) - time.Sleep(retryInterval) - } else { + bf := backoff.New(ctx, backoff.Config{ + MaxRetries: 10, + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 200 * time.Millisecond, + }) + + var res interface{} + var req = message.GlobalRollbackRequest{ + AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid}, + } + + var err error + for bf.Ongoing() { + if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == nil { break } - } - if err != nil { log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err) - return err + bf.Wait() } + + if bf.Err() != nil { + lastErr := errors.Wrap(err, bf.Err().Error()) + log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, lastErr) + return lastErr + } + log.Infof("GlobalRollbackRequest rollback success, xid %s,", gtr.Xid) gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus UnbindXid(ctx) + return nil } diff --git a/pkg/util/backoff/backoff.go b/pkg/util/backoff/backoff.go new file mode 100644 index 000000000..9b69eb3b7 --- /dev/null +++ b/pkg/util/backoff/backoff.go @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backoff + +import ( + "context" + "flag" + "fmt" + "math/rand" + "time" +) + +// Config configures a Backoff +type Config struct { + MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level + MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level + MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries +} + +// RegisterFlagsWithPrefix for Config. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.") + f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.") + f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.") +} + +// Backoff implements exponential backoff with randomized wait times +type Backoff struct { + cfg Config + ctx context.Context + numRetries int + nextDelayMin time.Duration + nextDelayMax time.Duration +} + +// New creates a Backoff object. Pass a Context that can also terminate the operation. +func New(ctx context.Context, cfg Config) *Backoff { + return &Backoff{ + cfg: cfg, + ctx: ctx, + nextDelayMin: cfg.MinBackoff, + nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff), + } +} + +// Reset the Backoff back to its initial condition +func (b *Backoff) Reset() { + b.numRetries = 0 + b.nextDelayMin = b.cfg.MinBackoff + b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff) +} + +// Ongoing returns true if caller should keep going +func (b *Backoff) Ongoing() bool { + // Stop if Context has errored or max retry count is exceeded + return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries) +} + +// Err returns the reason for terminating the backoff, or nil if it didn't terminate +func (b *Backoff) Err() error { + if b.ctx.Err() != nil { + return b.ctx.Err() + } + if b.cfg.MaxRetries != 0 && b.numRetries >= b.cfg.MaxRetries { + return fmt.Errorf("terminated after %d retries", b.numRetries) + } + return nil +} + +// NumRetries returns the number of retries so far +func (b *Backoff) NumRetries() int { + return b.numRetries +} + +// Wait sleeps for the backoff time then increases the retry count and backoff time +// Returns immediately if Context is terminated +func (b *Backoff) Wait() { + // Increase the number of retries and get the next delay + sleepTime := b.NextDelay() + + if b.Ongoing() { + select { + case <-b.ctx.Done(): + case <-time.After(sleepTime): + } + } +} + +func (b *Backoff) NextDelay() time.Duration { + b.numRetries++ + + // Handle the edge case where the min and max have the same value + // (or due to some misconfig max is < min) + if b.nextDelayMin >= b.nextDelayMax { + return b.nextDelayMin + } + + // Add a jitter within the next exponential backoff range + sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin))) + + // Apply the exponential backoff to calculate the next jitter + // range, unless we've already reached the max + if b.nextDelayMax < b.cfg.MaxBackoff { + b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff) + b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff) + } + + return sleepTime +} + +func doubleDuration(value time.Duration, max time.Duration) time.Duration { + value = value * 2 + + if value <= max { + return value + } + + return max +} diff --git a/pkg/util/backoff/backoff_test.go b/pkg/util/backoff/backoff_test.go new file mode 100644 index 000000000..0ae643968 --- /dev/null +++ b/pkg/util/backoff/backoff_test.go @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backoff + +import ( + "context" + "testing" + "time" +) + +func TestBackoff_NextDelay(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + minBackoff time.Duration + maxBackoff time.Duration + expectedRanges [][]time.Duration + }{ + "exponential backoff with jitter honoring min and max": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 10 * time.Second, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + {800 * time.Millisecond, 1600 * time.Millisecond}, + {1600 * time.Millisecond, 3200 * time.Millisecond}, + {3200 * time.Millisecond, 6400 * time.Millisecond}, + {6400 * time.Millisecond, 10000 * time.Millisecond}, + {6400 * time.Millisecond, 10000 * time.Millisecond}, + }, + }, + "exponential backoff with max equal to the end of a range": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 800 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + }, + }, + "exponential backoff with max equal to the end of a range + 1": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 801 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 800 * time.Millisecond}, + {800 * time.Millisecond, 801 * time.Millisecond}, + {800 * time.Millisecond, 801 * time.Millisecond}, + }, + }, + "exponential backoff with max equal to the end of a range - 1": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 799 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 400 * time.Millisecond}, + {400 * time.Millisecond, 799 * time.Millisecond}, + {400 * time.Millisecond, 799 * time.Millisecond}, + }, + }, + "min backoff is equal to max": { + minBackoff: 100 * time.Millisecond, + maxBackoff: 100 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {100 * time.Millisecond, 100 * time.Millisecond}, + {100 * time.Millisecond, 100 * time.Millisecond}, + {100 * time.Millisecond, 100 * time.Millisecond}, + }, + }, + "min backoff is greater then max": { + minBackoff: 200 * time.Millisecond, + maxBackoff: 100 * time.Millisecond, + expectedRanges: [][]time.Duration{ + {200 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 200 * time.Millisecond}, + {200 * time.Millisecond, 200 * time.Millisecond}, + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + b := New(context.Background(), Config{ + MinBackoff: testData.minBackoff, + MaxBackoff: testData.maxBackoff, + MaxRetries: len(testData.expectedRanges), + }) + + for _, expectedRange := range testData.expectedRanges { + delay := b.NextDelay() + + if delay < expectedRange[0] || delay > expectedRange[1] { + t.Errorf("%d expected to be within %d and %d", delay, expectedRange[0], expectedRange[1]) + } + } + }) + } +}