diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 2d42a61ff369b..0a58ed8eb5b40 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -68,7 +68,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 25, + shard_count = 27, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index f28c5655eed9a..d2f823e4aa80d 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -712,3 +712,12 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar c.inResolvingLock.Store(false) }() } + +func (c *CheckpointAdvancer) TEST_registerCallbackForSubscriptions(f func()) int { + cnt := 0 + for _, sub := range c.subscriber.subscriptions { + sub.onDaemonExit = f + cnt += 1 + } + return cnt +} diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 7b7e31f24ae07..c4410b76ee234 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -541,3 +541,55 @@ func TestCheckPointResume(t *testing.T) { c.advanceClusterTimeBy(2 * time.Minute) require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") } + +func TestOwnershipLost(t *testing.T) { + c := createFakeCluster(t, 4, false) + c.splitAndScatter(manyRegions(0, 10240)...) + installSubscribeSupport(c) + ctx, cancel := context.WithCancel(context.Background()) + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.OnStart(ctx) + adv.OnBecomeOwner(ctx) + require.NoError(t, adv.OnTick(ctx)) + c.advanceCheckpoints() + c.flushAll() + failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "pause") + failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/FlushSubscriber.Clear.timeoutMs", "return(500)") + wg := new(sync.WaitGroup) + wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done)) + cancel() + failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend") + wg.Wait() +} + +func TestSubscriptionPanic(t *testing.T) { + c := createFakeCluster(t, 4, false) + c.splitAndScatter(manyRegions(0, 20)...) + installSubscribeSupport(c) + ctx, cancel := context.WithCancel(context.Background()) + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.OnStart(ctx) + adv.OnBecomeOwner(ctx) + wg := new(sync.WaitGroup) + wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done)) + + require.NoError(t, adv.OnTick(ctx)) + failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "5*panic") + ckpt := c.advanceCheckpoints() + c.flushAll() + cnt := 0 + for { + require.NoError(t, adv.OnTick(ctx)) + cnt++ + if env.checkpoint >= ckpt { + break + } + if cnt > 100 { + t.Fatalf("After 100 times, the progress cannot be advanced.") + } + } + cancel() + wg.Wait() +} diff --git a/br/pkg/streamhelper/flush_subscriber.go b/br/pkg/streamhelper/flush_subscriber.go index e8b5f43608809..2b574fbb838bf 100644 --- a/br/pkg/streamhelper/flush_subscriber.go +++ b/br/pkg/streamhelper/flush_subscriber.go @@ -11,8 +11,10 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/log" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/pingcap/tidb/metrics" @@ -23,6 +25,11 @@ import ( "google.golang.org/grpc/status" ) +const ( + // clearSubscriberTimeOut is the timeout for clearing the subscriber. + clearSubscriberTimeOut = 1 * time.Minute +) + // FlushSubscriber maintains the state of subscribing to the cluster. type FlushSubscriber struct { dialer LogBackupService @@ -86,7 +93,7 @@ func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error { for id := range f.subscriptions { _, ok := storeSet[id] if !ok { - f.removeSubscription(id) + f.removeSubscription(ctx, id) } } return nil @@ -94,9 +101,18 @@ func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error { // Clear clears all the subscriptions. func (f *FlushSubscriber) Clear() { - log.Info("[log backup flush subscriber] Clearing.") + timeout := clearSubscriberTimeOut + failpoint.Inject("FlushSubscriber.Clear.timeoutMs", func(v failpoint.Value) { + //nolint:durationcheck + timeout = time.Duration(v.(int)) * time.Millisecond + }) + log.Info("Clearing.", + zap.String("category", "log backup flush subscriber"), + zap.Duration("timeout", timeout)) + cx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() for id := range f.subscriptions { - f.removeSubscription(id) + f.removeSubscription(cx, id) } } @@ -133,15 +149,11 @@ type eventStream = logbackup.LogBackup_SubscribeFlushEventClient type joinHandle <-chan struct{} -func (jh joinHandle) WaitTimeOut(dur time.Duration) { - var t <-chan time.Time - if dur > 0 { - t = time.After(dur) - } +func (jh joinHandle) Wait(ctx context.Context) { select { case <-jh: - case <-t: - log.Warn("join handle timed out.") + case <-ctx.Done(): + log.Warn("join handle timed out.", zap.StackSkip("caller", 1)) } } @@ -172,6 +184,8 @@ type subscription struct { // we need to try reconnect even there is a error cannot be retry. storeBootAt uint64 output chan<- spans.Valued + + onDaemonExit func() } func (s *subscription) emitError(err error) { @@ -215,7 +229,7 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt)) // We should shutdown the background task firstly. // Once it yields some error during shuting down, the error won't be brought to next run. - s.close() + s.close(ctx) s.clearError() c, err := dialer.GetLogBackupClient(ctx, s.storeID) @@ -238,10 +252,10 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e return nil } -func (s *subscription) close() { +func (s *subscription) close(ctx context.Context) { if s.cancel != nil { s.cancel() - s.background.WaitTimeOut(1 * time.Minute) + s.background.Wait(ctx) } // HACK: don't close the internal channel here, // because it is a ever-sharing channel. @@ -250,6 +264,16 @@ func (s *subscription) close() { func (s *subscription) listenOver(ctx context.Context, cli eventStream) { storeID := s.storeID logutil.CL(ctx).Info("Listen starting.", zap.Uint64("store", storeID)) + defer func() { + if s.onDaemonExit != nil { + s.onDaemonExit() + } + + if pData := recover(); pData != nil { + log.Warn("Subscriber paniked.", zap.Uint64("store", storeID), zap.Any("panic-data", pData), zap.Stack("stack")) + s.emitError(errors.Annotatef(berrors.ErrUnknown, "panic during executing: %v", pData)) + } + }() for { // Shall we use RecvMsg for better performance? // Note that the spans.Full requires the input slice be immutable. @@ -264,6 +288,7 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) { return } + log.Debug("Sending events.", zap.Int("size", len(msg.Events))) for _, m := range msg.Events { start, err := decodeKey(m.StartKey) if err != nil { @@ -277,13 +302,22 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) { logutil.Key("event", m.EndKey), logutil.ShortError(err)) continue } - s.output <- spans.Valued{ + failpoint.Inject("subscription.listenOver.aboutToSend", func() {}) + + evt := spans.Valued{ Key: spans.Span{ StartKey: start, EndKey: end, }, Value: m.Checkpoint, } + select { + case s.output <- evt: + case <-ctx.Done(): + logutil.CL(ctx).Warn("Context canceled while sending events.", + zap.Uint64("store", storeID)) + return + } } metrics.RegionCheckpointSubscriptionEvent.WithLabelValues( strconv.Itoa(int(storeID))).Observe(float64(len(msg.Events))) @@ -294,11 +328,12 @@ func (f *FlushSubscriber) addSubscription(ctx context.Context, toStore Store) { f.subscriptions[toStore.ID] = newSubscription(toStore, f.eventsTunnel) } -func (f *FlushSubscriber) removeSubscription(toStore uint64) { +func (f *FlushSubscriber) removeSubscription(ctx context.Context, toStore uint64) { subs, ok := f.subscriptions[toStore] if ok { - log.Info("[log backup subscription manager] Removing subscription.", zap.Uint64("store", toStore)) - subs.close() + log.Info("Removing subscription.", zap.String("category", "log backup subscription manager"), + zap.Uint64("store", toStore)) + subs.close(ctx) delete(f.subscriptions, toStore) } }