Skip to content

Commit

Permalink
log backup: reset store grpc connection when encounter error (#50539) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 19, 2024
1 parent bdcd16d commit f27e4e0
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 16 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 20,
shard_count = 21,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (log
return cli, nil
}

// ClearCache clears the log backup client connection cache.
func (t clusterEnv) ClearCache(ctx context.Context, storeID uint64) error {
return t.clis.RemoveConn(ctx, storeID)
}

// CliEnv creates the Env for CLI usage.
func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env {
return clusterEnv{
Expand Down Expand Up @@ -134,6 +139,8 @@ func TiDBEnv(tikvStore tikv.Storage, pdCli pd.Client, etcdCli *clientv3.Client,
type LogBackupService interface {
// GetLogBackupClient gets the log backup client.
GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error)
// Disable log backup client connection cache.
ClearCache(ctx context.Context, storeID uint64) error
}

// StreamMeta connects to the metadata service (normally PD).
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,47 @@ func TestTaskRangesWithSplit(t *testing.T) {
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestClearCache(t *testing.T) {
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")

clearedCache := make(map[uint64]bool)
c.onGetClient = func(u uint64) error {
// make store u cache cleared
clearedCache[u] = true
return nil
}
failedStoreID := uint64(0)
hasFailed := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// mark this store cache cleared
failedStoreID = s.GetID()
if hasFailed {
hasFailed = true
return errors.New("failed to get checkpoint")
}
return nil
}
s.clientMu.Unlock()
// mark one store failed is enough
break
}
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.NoError(err)
req.True(failedStoreID > 0, "failed to mark the cluster: ")
req.Equal(clearedCache[failedStoreID], true)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (t trivialFlushStream) RecvMsg(m interface{}) error {
return nil
}

func (f *fakeStore) GetID() uint64 {
return f.id
}

func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
f.clientMu.Lock()
defer f.clientMu.Unlock()
Expand Down Expand Up @@ -315,6 +319,17 @@ func (f *fakeCluster) GetLogBackupClient(ctx context.Context, storeID uint64) (l
return cli, nil
}

func (f *fakeCluster) ClearCache(ctx context.Context, storeID uint64) error {
if f.onGetClient != nil {
err := f.onGetClient(storeID)
if err != nil {
return err
}
return nil
}
return nil
}

// Stores returns the store metadata from the cluster.
func (f *fakeCluster) Stores(ctx context.Context) ([]streamhelper.Store, error) {
r := make([]streamhelper.Store, 0, len(f.stores))
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (c *storeCollector) sendPendingRequests(ctx context.Context) error {
}
cps, err := cli.GetLastFlushTSOfRegion(ctx, &c.currentRequest)
if err != nil {
// try disable connection cache if met error
_ = c.service.ClearCache(ctx, c.storeID)
return err
}
metrics.GetCheckpointBatchSize.WithLabelValues("checkpoint").Observe(float64(len(c.currentRequest.GetRegions())))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
})
if err != nil {
cancel()
_ = dialer.ClearCache(ctx, s.storeID)
return errors.Annotate(err, "failed to subscribe events")
}
lcx := logutil.ContextWithField(cx, zap.Uint64("store-id", s.storeID),
Expand Down
42 changes: 27 additions & 15 deletions br/pkg/utils/store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,26 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64)
return conn, nil
}

func (mgr *StoreManager) RemoveConn(ctx context.Context, storeID uint64) error {
if ctx.Err() != nil {
return errors.Trace(ctx.Err())
}

mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
return nil
}
return nil
}

func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error {
if ctx.Err() != nil {
return errors.Trace(ctx.Err())
Expand All @@ -190,26 +210,18 @@ func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*g

// ResetBackupClient reset the connection for backup client.
func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
var (
conn *grpc.ClientConn
err error
)
err = mgr.RemoveConn(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}

mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
log.Info("Reset backup client", zap.Uint64("storeID", storeID))
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
}
var (
conn *grpc.ClientConn
err error
)
for retry := 0; retry < resetRetryTimes; retry++ {
conn, err = mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
Expand Down

0 comments on commit f27e4e0

Please sign in to comment.