Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize retry #284

Merged
merged 3 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 46 additions & 44 deletions pkg/tm/global_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/util/backoff"
)

type GlobalTransaction struct {
Expand Down Expand Up @@ -96,35 +97,35 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
// todo retry and retryInterval should read from config
retry = 10
retryInterval = 200 * time.Millisecond
req = message.GlobalCommitRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: gtr.Xid,
},
}
)
for ; retry > 0; retry-- {
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalCommitRequest error, xid %s, error %v", gtr.Xid, err)
time.Sleep(retryInterval)
} else {
bf := backoff.New(ctx, backoff.Config{
MaxRetries: 10,
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 200 * time.Millisecond,
})

var req = message.GlobalCommitRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
}
var res interface{}
var err error
for bf.Ongoing() {
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == nil {
break
}
log.Warnf("send global commit request failed, xid %s, error %v", gtr.Xid, err)
bf.Wait()
}
if err != nil {
log.Infof("send global commit request failed, xid %s, error %v", gtr.Xid, err)
return err

if bf.Err() != nil {
lastErr := errors.Wrap(err, bf.Err().Error())
log.Warnf("send global commit request failed, xid %s, error %v", gtr.Xid, lastErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个log级别是不是应该提升为 Error 级别。另外这个log和115行的log感觉是重复的,保留一个应该就可以了。

return lastErr
}

log.Infof("send global commit request success, xid %s", gtr.Xid)
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus
UnbindXid(ctx)

return nil
}

Expand All @@ -138,35 +139,36 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran
return errors.New("Rollback xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
// todo retry and retryInterval should read from config
retry = 10
retryInterval = 200 * time.Millisecond
req = message.GlobalRollbackRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: gtr.Xid,
},
}
)
for ; retry > 0; retry-- {
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalRollbackRequest error, xid %s, error %v", gtr.Xid, err)
time.Sleep(retryInterval)
} else {
bf := backoff.New(ctx, backoff.Config{
MaxRetries: 10,
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 200 * time.Millisecond,
})

var res interface{}
var req = message.GlobalRollbackRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid},
}

var err error
for bf.Ongoing() {
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == nil {
break
}
}
if err != nil {
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err)
return err
bf.Wait()
}

if bf.Err() != nil {
lastErr := errors.Wrap(err, bf.Err().Error())
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, lastErr)
return lastErr
}

log.Infof("GlobalRollbackRequest rollback success, xid %s,", gtr.Xid)
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus
UnbindXid(ctx)

return nil
}

Expand Down
134 changes: 134 additions & 0 deletions pkg/util/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package backoff

import (
"context"
"flag"
"fmt"
"math/rand"
"time"
)

// Config configures a Backoff
type Config struct {
MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level
MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level
MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.")
f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.")
f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.")
}

// Backoff implements exponential backoff with randomized wait times
type Backoff struct {
cfg Config
ctx context.Context
numRetries int
nextDelayMin time.Duration
nextDelayMax time.Duration
}

// New creates a Backoff object. Pass a Context that can also terminate the operation.
func New(ctx context.Context, cfg Config) *Backoff {
return &Backoff{
cfg: cfg,
ctx: ctx,
nextDelayMin: cfg.MinBackoff,
nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff),
}
}

// Reset the Backoff back to its initial condition
func (b *Backoff) Reset() {
b.numRetries = 0
b.nextDelayMin = b.cfg.MinBackoff
b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff)
}

// Ongoing returns true if caller should keep going
func (b *Backoff) Ongoing() bool {
// Stop if Context has errored or max retry count is exceeded
return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries)
}

// Err returns the reason for terminating the backoff, or nil if it didn't terminate
func (b *Backoff) Err() error {
if b.ctx.Err() != nil {
return b.ctx.Err()
}
if b.cfg.MaxRetries != 0 && b.numRetries >= b.cfg.MaxRetries {
return fmt.Errorf("terminated after %d retries", b.numRetries)
}
return nil
}

// NumRetries returns the number of retries so far
func (b *Backoff) NumRetries() int {
return b.numRetries
}

// Wait sleeps for the backoff time then increases the retry count and backoff time
// Returns immediately if Context is terminated
func (b *Backoff) Wait() {
// Increase the number of retries and get the next delay
sleepTime := b.NextDelay()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement can be put into the if range

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

举个例子

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

举个例子

感觉是个优化项,不是问题,比如:

if b.Ongoing() {
sleepTime := b.NextDelay()
select {
case <-b.ctx.Done():
case <-time.After(sleepTime):
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你这个不太好。目前这个实现已经很简洁了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用起来也很简单。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的, 我A了


if b.Ongoing() {
select {
case <-b.ctx.Done():
case <-time.After(sleepTime):
}
}
}

func (b *Backoff) NextDelay() time.Duration {
b.numRetries++

// Handle the edge case where the min and max have the same value
// (or due to some misconfig max is < min)
if b.nextDelayMin >= b.nextDelayMax {
return b.nextDelayMin
}

// Add a jitter within the next exponential backoff range
sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin)))

// Apply the exponential backoff to calculate the next jitter
// range, unless we've already reached the max
if b.nextDelayMax < b.cfg.MaxBackoff {
b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff)
b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff)
}

return sleepTime
}

func doubleDuration(value time.Duration, max time.Duration) time.Duration {
value = value * 2

if value <= max {
return value
}

return max
}
120 changes: 120 additions & 0 deletions pkg/util/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package backoff

import (
"context"
"testing"
"time"
)

func TestBackoff_NextDelay(t *testing.T) {
t.Parallel()

tests := map[string]struct {
minBackoff time.Duration
maxBackoff time.Duration
expectedRanges [][]time.Duration
}{
"exponential backoff with jitter honoring min and max": {
minBackoff: 100 * time.Millisecond,
maxBackoff: 10 * time.Second,
expectedRanges: [][]time.Duration{
{100 * time.Millisecond, 200 * time.Millisecond},
{200 * time.Millisecond, 400 * time.Millisecond},
{400 * time.Millisecond, 800 * time.Millisecond},
{800 * time.Millisecond, 1600 * time.Millisecond},
{1600 * time.Millisecond, 3200 * time.Millisecond},
{3200 * time.Millisecond, 6400 * time.Millisecond},
{6400 * time.Millisecond, 10000 * time.Millisecond},
{6400 * time.Millisecond, 10000 * time.Millisecond},
},
},
"exponential backoff with max equal to the end of a range": {
minBackoff: 100 * time.Millisecond,
maxBackoff: 800 * time.Millisecond,
expectedRanges: [][]time.Duration{
{100 * time.Millisecond, 200 * time.Millisecond},
{200 * time.Millisecond, 400 * time.Millisecond},
{400 * time.Millisecond, 800 * time.Millisecond},
{400 * time.Millisecond, 800 * time.Millisecond},
},
},
"exponential backoff with max equal to the end of a range + 1": {
minBackoff: 100 * time.Millisecond,
maxBackoff: 801 * time.Millisecond,
expectedRanges: [][]time.Duration{
{100 * time.Millisecond, 200 * time.Millisecond},
{200 * time.Millisecond, 400 * time.Millisecond},
{400 * time.Millisecond, 800 * time.Millisecond},
{800 * time.Millisecond, 801 * time.Millisecond},
{800 * time.Millisecond, 801 * time.Millisecond},
},
},
"exponential backoff with max equal to the end of a range - 1": {
minBackoff: 100 * time.Millisecond,
maxBackoff: 799 * time.Millisecond,
expectedRanges: [][]time.Duration{
{100 * time.Millisecond, 200 * time.Millisecond},
{200 * time.Millisecond, 400 * time.Millisecond},
{400 * time.Millisecond, 799 * time.Millisecond},
{400 * time.Millisecond, 799 * time.Millisecond},
},
},
"min backoff is equal to max": {
minBackoff: 100 * time.Millisecond,
maxBackoff: 100 * time.Millisecond,
expectedRanges: [][]time.Duration{
{100 * time.Millisecond, 100 * time.Millisecond},
{100 * time.Millisecond, 100 * time.Millisecond},
{100 * time.Millisecond, 100 * time.Millisecond},
},
},
"min backoff is greater then max": {
minBackoff: 200 * time.Millisecond,
maxBackoff: 100 * time.Millisecond,
expectedRanges: [][]time.Duration{
{200 * time.Millisecond, 200 * time.Millisecond},
{200 * time.Millisecond, 200 * time.Millisecond},
{200 * time.Millisecond, 200 * time.Millisecond},
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()

b := New(context.Background(), Config{
MinBackoff: testData.minBackoff,
MaxBackoff: testData.maxBackoff,
MaxRetries: len(testData.expectedRanges),
})

for _, expectedRange := range testData.expectedRanges {
delay := b.NextDelay()

if delay < expectedRange[0] || delay > expectedRange[1] {
t.Errorf("%d expected to be within %d and %d", delay, expectedRange[0], expectedRange[1])
}
}
})
}
}