diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index e1eb7adfb95ba..f0b8cb6a27e6f 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -68,7 +68,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 20, + shard_count = 21, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index a7f835b847394..10aa14f2ee0aa 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -104,6 +104,11 @@ func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (log return cli, nil } +// ClearCache clears the log backup client connection cache. +func (t clusterEnv) ClearCache(ctx context.Context, storeID uint64) error { + return t.clis.RemoveConn(ctx, storeID) +} + // CliEnv creates the Env for CLI usage. func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env { return clusterEnv{ @@ -134,6 +139,8 @@ func TiDBEnv(tikvStore tikv.Storage, pdCli pd.Client, etcdCli *clientv3.Client, type LogBackupService interface { // GetLogBackupClient gets the log backup client. GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error) + // Disable log backup client connection cache. + ClearCache(ctx context.Context, storeID uint64) error } // StreamMeta connects to the metadata service (normally PD). diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index fe34b56d08be3..037ff4949b2e2 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -251,6 +251,47 @@ func TestTaskRangesWithSplit(t *testing.T) { require.Greater(t, env.getCheckpoint(), fstCheckpoint) } +func TestClearCache(t *testing.T) { + c := createFakeCluster(t, 4, true) + ctx := context.Background() + req := require.New(t) + c.splitAndScatter("0012", "0034", "0048") + + clearedCache := make(map[uint64]bool) + c.onGetClient = func(u uint64) error { + // make store u cache cleared + clearedCache[u] = true + return nil + } + failedStoreID := uint64(0) + hasFailed := false + for _, s := range c.stores { + s.clientMu.Lock() + s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error { + // mark this store cache cleared + failedStoreID = s.GetID() + if hasFailed { + hasFailed = true + return errors.New("failed to get checkpoint") + } + return nil + } + s.clientMu.Unlock() + // mark one store failed is enough + break + } + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.StartTaskListener(ctx) + var err error + shouldFinishInTime(t, time.Second, "ticking", func() { + err = adv.OnTick(ctx) + }) + req.NoError(err) + req.True(failedStoreID > 0, "failed to mark the cluster: ") + req.Equal(clearedCache[failedStoreID], true) +} + func TestBlocked(t *testing.T) { log.SetLevel(zapcore.DebugLevel) c := createFakeCluster(t, 4, true) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 59490e32ec7b9..70fa53d6b8e23 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -171,6 +171,10 @@ func (t trivialFlushStream) RecvMsg(m interface{}) error { return nil } +func (f *fakeStore) GetID() uint64 { + return f.id +} + func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) { f.clientMu.Lock() defer f.clientMu.Unlock() @@ -315,6 +319,17 @@ func (f *fakeCluster) GetLogBackupClient(ctx context.Context, storeID uint64) (l return cli, nil } +func (f *fakeCluster) ClearCache(ctx context.Context, storeID uint64) error { + if f.onGetClient != nil { + err := f.onGetClient(storeID) + if err != nil { + return err + } + return nil + } + return nil +} + // Stores returns the store metadata from the cluster. func (f *fakeCluster) Stores(ctx context.Context) ([]streamhelper.Store, error) { r := make([]streamhelper.Store, 0, len(f.stores)) diff --git a/br/pkg/streamhelper/collector.go b/br/pkg/streamhelper/collector.go index 8f11e419678fa..bce13868b28f8 100644 --- a/br/pkg/streamhelper/collector.go +++ b/br/pkg/streamhelper/collector.go @@ -187,6 +187,8 @@ func (c *storeCollector) sendPendingRequests(ctx context.Context) error { } cps, err := cli.GetLastFlushTSOfRegion(ctx, &c.currentRequest) if err != nil { + // try disable connection cache if met error + _ = c.service.ClearCache(ctx, c.storeID) return err } metrics.GetCheckpointBatchSize.WithLabelValues("checkpoint").Observe(float64(len(c.currentRequest.GetRegions()))) diff --git a/br/pkg/streamhelper/flush_subscriber.go b/br/pkg/streamhelper/flush_subscriber.go index 7db4647057bad..423089169bec8 100644 --- a/br/pkg/streamhelper/flush_subscriber.go +++ b/br/pkg/streamhelper/flush_subscriber.go @@ -228,6 +228,7 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e }) if err != nil { cancel() + _ = dialer.ClearCache(ctx, s.storeID) return errors.Annotate(err, "failed to subscribe events") } lcx := logutil.ContextWithField(cx, zap.Uint64("store-id", s.storeID), diff --git a/br/pkg/utils/store_manager.go b/br/pkg/utils/store_manager.go index 430d1394b0037..c588d8f56985e 100644 --- a/br/pkg/utils/store_manager.go +++ b/br/pkg/utils/store_manager.go @@ -164,6 +164,26 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) return conn, nil } +func (mgr *StoreManager) RemoveConn(ctx context.Context, storeID uint64) error { + if ctx.Err() != nil { + return errors.Trace(ctx.Err()) + } + + mgr.grpcClis.mu.Lock() + defer mgr.grpcClis.mu.Unlock() + + if conn, ok := mgr.grpcClis.clis[storeID]; ok { + // Find a cached backup client. + err := conn.Close() + if err != nil { + log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID)) + } + delete(mgr.grpcClis.clis, storeID) + return nil + } + return nil +} + func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error { if ctx.Err() != nil { return errors.Trace(ctx.Err()) @@ -190,26 +210,18 @@ func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*g // ResetBackupClient reset the connection for backup client. func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) { - if ctx.Err() != nil { - return nil, errors.Trace(ctx.Err()) + var ( + conn *grpc.ClientConn + err error + ) + err = mgr.RemoveConn(ctx, storeID) + if err != nil { + return nil, errors.Trace(err) } mgr.grpcClis.mu.Lock() defer mgr.grpcClis.mu.Unlock() - if conn, ok := mgr.grpcClis.clis[storeID]; ok { - // Find a cached backup client. - log.Info("Reset backup client", zap.Uint64("storeID", storeID)) - err := conn.Close() - if err != nil { - log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID)) - } - delete(mgr.grpcClis.clis, storeID) - } - var ( - conn *grpc.ClientConn - err error - ) for retry := 0; retry < resetRetryTimes; retry++ { conn, err = mgr.getGrpcConnLocked(ctx, storeID) if err != nil {