Skip to content

Commit

Permalink
externalresource(engine): Add retry for GCRunner (#5705)
Browse files Browse the repository at this point in the history
ref #5699
  • Loading branch information
liuzix authored Jun 2, 2022
1 parent d0c1970 commit 6724696
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 8 deletions.
25 changes: 19 additions & 6 deletions engine/pkg/externalresource/manager/gc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/pkg/retry"
)

const (
gcCheckInterval = 10 * time.Second
gcTimeout = 10 * time.Second
gcCheckInterval = 10 * time.Second
gcTimeout = 10 * time.Second
gcOnceRetryMinIntervalMs = 100
gcOnceRetryMaxIntervalMs = 100
)

// GCHandlerFunc is the type for a function that actually removes a resource.
Expand Down Expand Up @@ -73,7 +76,7 @@ func (r *DefaultGCRunner) Run(ctx context.Context) error {
}

timeoutCtx, cancel := context.WithTimeout(ctx, gcTimeout)
err := r.gcOnce(timeoutCtx)
err := r.gcOnceWithRetry(timeoutCtx)
cancel()

if err != nil {
Expand All @@ -91,6 +94,15 @@ func (r *DefaultGCRunner) GCNotify() {
}
}

func (r *DefaultGCRunner) gcOnceWithRetry(ctx context.Context) error {
return retry.Do(ctx, func() error {
return r.gcOnce(ctx)
},
retry.WithBackoffBaseDelay(gcOnceRetryMinIntervalMs),
retry.WithBackoffMaxDelay(gcOnceRetryMaxIntervalMs),
)
}

func (r *DefaultGCRunner) gcOnce(
ctx context.Context,
) error {
Expand Down Expand Up @@ -129,9 +141,10 @@ func (r *DefaultGCRunner) gcOnce(

result, err := r.client.DeleteResource(ctx, res.ID)
if err != nil {
// If deletion fails, we do not need to retry for now.
log.L().Warn("Failed to delete resource after GC", zap.Any("resource", res))
return nil
log.L().Warn("Failed to delete resource meta after GC",
zap.Any("resource", res),
zap.Error(err))
return err
}
if result.RowsAffected() == 0 {
log.L().Warn("Resource is deleted unexpectedly", zap.Any("resource", res))
Expand Down
72 changes: 70 additions & 2 deletions engine/pkg/externalresource/manager/gc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

type gcRunnerTestHelper struct {
Runner *DefaultGCRunner
Meta pkgOrm.Client
Meta pkgOrm.ResourceClient
Clock *clock.Mock

wg sync.WaitGroup
Expand All @@ -46,12 +46,15 @@ func newGCRunnerTestHelper() *gcRunnerTestHelper {
if err != nil {
panic(err)
}
return newGCRunnerTestHelperWithMeta(meta)
}

func newGCRunnerTestHelperWithMeta(meta pkgOrm.ResourceClient) *gcRunnerTestHelper {
reqCh := make(chan *resModel.ResourceMeta, 16)
mockHandler := func(ctx context.Context, meta *resModel.ResourceMeta) error {
select {
case <-ctx.Done():
return errors.Trace(err)
return errors.Trace(ctx.Err())
case reqCh <- meta:
}
return nil
Expand Down Expand Up @@ -96,6 +99,46 @@ func (h *gcRunnerTestHelper) WaitGC(t *testing.T) (meta *resModel.ResourceMeta)
return
}

// mockMetaClientErrOnce is a temporary solution for testing
// the retry logic of gcOnce().
// TODO make a more generic version of this struct, and
// do better error condition testing in other situations too.
type mockMetaClientErrOnce struct {
pkgOrm.ResourceClient

methodsAllReadyErred map[string]struct{}
}

func newMockMetaClientErrOnce() *mockMetaClientErrOnce {
inner, err := pkgOrm.NewMockClient()
if err != nil {
panic(err)
}

return &mockMetaClientErrOnce{
ResourceClient: inner,
methodsAllReadyErred: make(map[string]struct{}),
}
}

func (c *mockMetaClientErrOnce) DeleteResource(ctx context.Context, resourceID string) (pkgOrm.Result, error) {
if _, erred := c.methodsAllReadyErred["DeleteResource"]; !erred {
c.methodsAllReadyErred["DeleteResource"] = struct{}{}
return nil, errors.New("injected error")
}

return c.ResourceClient.DeleteResource(ctx, resourceID)
}

func (c *mockMetaClientErrOnce) GetOneResourceForGC(ctx context.Context) (*resModel.ResourceMeta, error) {
if _, erred := c.methodsAllReadyErred["GetOneResourceForGC"]; !erred {
c.methodsAllReadyErred["GetOneResourceForGC"] = struct{}{}
return nil, errors.New("injected error")
}

return c.ResourceClient.GetOneResourceForGC(ctx)
}

func TestGCRunnerNotify(t *testing.T) {
helper := newGCRunnerTestHelper()

Expand Down Expand Up @@ -213,3 +256,28 @@ loop:

helper.Close()
}

func TestGCRunnerRetry(t *testing.T) {
mockMeta := newMockMetaClientErrOnce()
helper := newGCRunnerTestHelperWithMeta(mockMeta)

err := helper.Meta.CreateResource(context.Background(), &resModel.ResourceMeta{
ID: "/local/resource-1",
Job: "job-1",
Worker: "worker-1",
Executor: "executor-1",
GCPending: true,
})
require.NoError(t, err)

helper.Start()

// Note that since we are not advancing the clock,
// GC can only be triggered by calling Notify.
helper.Runner.GCNotify()

gcRes := helper.WaitGC(t)
require.Equal(t, "/local/resource-1", gcRes.ID)

helper.Close()
}

0 comments on commit 6724696

Please sign in to comment.