From 6724696388cb48b6e9db3020d0978198b968d54c Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 2 Jun 2022 19:06:28 +0800 Subject: [PATCH] externalresource(engine): Add retry for GCRunner (#5705) ref pingcap/tiflow#5699 --- .../pkg/externalresource/manager/gc_runner.go | 25 +++++-- .../manager/gc_runner_test.go | 72 ++++++++++++++++++- 2 files changed, 89 insertions(+), 8 deletions(-) diff --git a/engine/pkg/externalresource/manager/gc_runner.go b/engine/pkg/externalresource/manager/gc_runner.go index 91ae8e6909d..91e0962c294 100644 --- a/engine/pkg/externalresource/manager/gc_runner.go +++ b/engine/pkg/externalresource/manager/gc_runner.go @@ -24,11 +24,14 @@ import ( "github.com/pingcap/tiflow/engine/pkg/clock" resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" + "github.com/pingcap/tiflow/pkg/retry" ) const ( - gcCheckInterval = 10 * time.Second - gcTimeout = 10 * time.Second + gcCheckInterval = 10 * time.Second + gcTimeout = 10 * time.Second + gcOnceRetryMinIntervalMs = 100 + gcOnceRetryMaxIntervalMs = 100 ) // GCHandlerFunc is the type for a function that actually removes a resource. @@ -73,7 +76,7 @@ func (r *DefaultGCRunner) Run(ctx context.Context) error { } timeoutCtx, cancel := context.WithTimeout(ctx, gcTimeout) - err := r.gcOnce(timeoutCtx) + err := r.gcOnceWithRetry(timeoutCtx) cancel() if err != nil { @@ -91,6 +94,15 @@ func (r *DefaultGCRunner) GCNotify() { } } +func (r *DefaultGCRunner) gcOnceWithRetry(ctx context.Context) error { + return retry.Do(ctx, func() error { + return r.gcOnce(ctx) + }, + retry.WithBackoffBaseDelay(gcOnceRetryMinIntervalMs), + retry.WithBackoffMaxDelay(gcOnceRetryMaxIntervalMs), + ) +} + func (r *DefaultGCRunner) gcOnce( ctx context.Context, ) error { @@ -129,9 +141,10 @@ func (r *DefaultGCRunner) gcOnce( result, err := r.client.DeleteResource(ctx, res.ID) if err != nil { - // If deletion fails, we do not need to retry for now. - log.L().Warn("Failed to delete resource after GC", zap.Any("resource", res)) - return nil + log.L().Warn("Failed to delete resource meta after GC", + zap.Any("resource", res), + zap.Error(err)) + return err } if result.RowsAffected() == 0 { log.L().Warn("Resource is deleted unexpectedly", zap.Any("resource", res)) diff --git a/engine/pkg/externalresource/manager/gc_runner_test.go b/engine/pkg/externalresource/manager/gc_runner_test.go index c5b3f948f66..5d96a2f7108 100644 --- a/engine/pkg/externalresource/manager/gc_runner_test.go +++ b/engine/pkg/externalresource/manager/gc_runner_test.go @@ -30,7 +30,7 @@ import ( type gcRunnerTestHelper struct { Runner *DefaultGCRunner - Meta pkgOrm.Client + Meta pkgOrm.ResourceClient Clock *clock.Mock wg sync.WaitGroup @@ -46,12 +46,15 @@ func newGCRunnerTestHelper() *gcRunnerTestHelper { if err != nil { panic(err) } + return newGCRunnerTestHelperWithMeta(meta) +} +func newGCRunnerTestHelperWithMeta(meta pkgOrm.ResourceClient) *gcRunnerTestHelper { reqCh := make(chan *resModel.ResourceMeta, 16) mockHandler := func(ctx context.Context, meta *resModel.ResourceMeta) error { select { case <-ctx.Done(): - return errors.Trace(err) + return errors.Trace(ctx.Err()) case reqCh <- meta: } return nil @@ -96,6 +99,46 @@ func (h *gcRunnerTestHelper) WaitGC(t *testing.T) (meta *resModel.ResourceMeta) return } +// mockMetaClientErrOnce is a temporary solution for testing +// the retry logic of gcOnce(). +// TODO make a more generic version of this struct, and +// do better error condition testing in other situations too. +type mockMetaClientErrOnce struct { + pkgOrm.ResourceClient + + methodsAllReadyErred map[string]struct{} +} + +func newMockMetaClientErrOnce() *mockMetaClientErrOnce { + inner, err := pkgOrm.NewMockClient() + if err != nil { + panic(err) + } + + return &mockMetaClientErrOnce{ + ResourceClient: inner, + methodsAllReadyErred: make(map[string]struct{}), + } +} + +func (c *mockMetaClientErrOnce) DeleteResource(ctx context.Context, resourceID string) (pkgOrm.Result, error) { + if _, erred := c.methodsAllReadyErred["DeleteResource"]; !erred { + c.methodsAllReadyErred["DeleteResource"] = struct{}{} + return nil, errors.New("injected error") + } + + return c.ResourceClient.DeleteResource(ctx, resourceID) +} + +func (c *mockMetaClientErrOnce) GetOneResourceForGC(ctx context.Context) (*resModel.ResourceMeta, error) { + if _, erred := c.methodsAllReadyErred["GetOneResourceForGC"]; !erred { + c.methodsAllReadyErred["GetOneResourceForGC"] = struct{}{} + return nil, errors.New("injected error") + } + + return c.ResourceClient.GetOneResourceForGC(ctx) +} + func TestGCRunnerNotify(t *testing.T) { helper := newGCRunnerTestHelper() @@ -213,3 +256,28 @@ loop: helper.Close() } + +func TestGCRunnerRetry(t *testing.T) { + mockMeta := newMockMetaClientErrOnce() + helper := newGCRunnerTestHelperWithMeta(mockMeta) + + err := helper.Meta.CreateResource(context.Background(), &resModel.ResourceMeta{ + ID: "/local/resource-1", + Job: "job-1", + Worker: "worker-1", + Executor: "executor-1", + GCPending: true, + }) + require.NoError(t, err) + + helper.Start() + + // Note that since we are not advancing the clock, + // GC can only be triggered by calling Notify. + helper.Runner.GCNotify() + + gcRes := helper.WaitGC(t) + require.Equal(t, "/local/resource-1", gcRes.ID) + + helper.Close() +}