Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log backup: reset store grpc connection when encounter error (#50539) #50703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 20,
shard_count = 21,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
return cli, nil
}

// ClearCache clears the log backup client connection cache.
func (t clusterEnv) ClearCache(ctx context.Context, storeID uint64) error {
return t.clis.RemoveConn(ctx, storeID)

Check warning on line 109 in br/pkg/streamhelper/advancer_env.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer_env.go#L108-L109

Added lines #L108 - L109 were not covered by tests
}

// CliEnv creates the Env for CLI usage.
func CliEnv(cli *utils.StoreManager, tikvStore tikv.Storage, etcdCli *clientv3.Client) Env {
return clusterEnv{
Expand Down Expand Up @@ -134,6 +139,8 @@
type LogBackupService interface {
// GetLogBackupClient gets the log backup client.
GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error)
// Disable log backup client connection cache.
ClearCache(ctx context.Context, storeID uint64) error
}

// StreamMeta connects to the metadata service (normally PD).
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,47 @@ func TestTaskRangesWithSplit(t *testing.T) {
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestClearCache(t *testing.T) {
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")

clearedCache := make(map[uint64]bool)
c.onGetClient = func(u uint64) error {
// make store u cache cleared
clearedCache[u] = true
return nil
}
failedStoreID := uint64(0)
hasFailed := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// mark this store cache cleared
failedStoreID = s.GetID()
if hasFailed {
hasFailed = true
return errors.New("failed to get checkpoint")
}
return nil
}
s.clientMu.Unlock()
// mark one store failed is enough
break
}
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.NoError(err)
req.True(failedStoreID > 0, "failed to mark the cluster: ")
req.Equal(clearedCache[failedStoreID], true)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (t trivialFlushStream) RecvMsg(m interface{}) error {
return nil
}

func (f *fakeStore) GetID() uint64 {
return f.id
}

func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
f.clientMu.Lock()
defer f.clientMu.Unlock()
Expand Down Expand Up @@ -315,6 +319,17 @@ func (f *fakeCluster) GetLogBackupClient(ctx context.Context, storeID uint64) (l
return cli, nil
}

func (f *fakeCluster) ClearCache(ctx context.Context, storeID uint64) error {
if f.onGetClient != nil {
err := f.onGetClient(storeID)
if err != nil {
return err
}
return nil
}
return nil
}

// Stores returns the store metadata from the cluster.
func (f *fakeCluster) Stores(ctx context.Context) ([]streamhelper.Store, error) {
r := make([]streamhelper.Store, 0, len(f.stores))
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@
}
cps, err := cli.GetLastFlushTSOfRegion(ctx, &c.currentRequest)
if err != nil {
// try disable connection cache if met error
_ = c.service.ClearCache(ctx, c.storeID)

Check warning on line 191 in br/pkg/streamhelper/collector.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/collector.go#L190-L191

Added lines #L190 - L191 were not covered by tests
return err
}
metrics.GetCheckpointBatchSize.WithLabelValues("checkpoint").Observe(float64(len(c.currentRequest.GetRegions())))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
})
if err != nil {
cancel()
_ = dialer.ClearCache(ctx, s.storeID)
return errors.Annotate(err, "failed to subscribe events")
}
lcx := logutil.ContextWithField(cx, zap.Uint64("store-id", s.storeID),
Expand Down
42 changes: 27 additions & 15 deletions br/pkg/utils/store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,26 @@
return conn, nil
}

func (mgr *StoreManager) RemoveConn(ctx context.Context, storeID uint64) error {
if ctx.Err() != nil {
return errors.Trace(ctx.Err())
}

mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
return nil

Check warning on line 182 in br/pkg/utils/store_manager.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/store_manager.go#L172-L182

Added lines #L172 - L182 were not covered by tests
}
return nil

Check warning on line 184 in br/pkg/utils/store_manager.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/store_manager.go#L184

Added line #L184 was not covered by tests
}

func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error {
if ctx.Err() != nil {
return errors.Trace(ctx.Err())
Expand All @@ -190,26 +210,18 @@

// ResetBackupClient reset the connection for backup client.
func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
var (
conn *grpc.ClientConn
err error
)
err = mgr.RemoveConn(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}

mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
log.Info("Reset backup client", zap.Uint64("storeID", storeID))
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
}
var (
conn *grpc.ClientConn
err error
)
for retry := 0; retry < resetRetryTimes; retry++ {
conn, err = mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
Expand Down