From 32093ce636094ec480409b357c8f07f6a7321b8f Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Tue, 24 Dec 2024 13:38:18 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #11903 Signed-off-by: ti-chi-bot --- cdc/kv/shared_client.go | 79 +++++++++++++++++++++++++++ cdc/kv/shared_client_test.go | 100 +++++++++++++++++++++++++++++++++++ cdc/kv/shared_stream.go | 45 ++++++++++------ pkg/version/check.go | 4 ++ 4 files changed, 213 insertions(+), 15 deletions(-) diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index ed15587d031..2a51804605d 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -47,6 +47,78 @@ import ( "golang.org/x/sync/errgroup" ) +<<<<<<< HEAD +======= +const ( + // Maximum total sleep time(in ms), 20 seconds. + tikvRequestMaxBackoff = 20000 + + // TiCDC always interacts with region leader, every time something goes wrong, + // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we + // don't need to force reload region anymore. + regionScheduleReload = false + + scanRegionsConcurrency = 1024 + + loadRegionRetryInterval time.Duration = 100 * time.Millisecond + resolveLockMinInterval time.Duration = 10 * time.Second + invalidSubscriptionID SubscriptionID = SubscriptionID(0) +) + +var ( + // To generate an ID for a new subscription. And the subscription ID will also be used as + // `RequestId` in region requests of the table. + subscriptionIDGen atomic.Uint64 + // To generate a streamID in `newStream`. + streamIDGen atomic.Uint64 +) + +var ( + // unreachable error, only used in unit test + errUnreachable = errors.New("kv client unreachable error") + logPanic = log.Panic +) + +var ( + metricFeedNotLeaderCounter = eventFeedErrorCounter.WithLabelValues("NotLeader") + metricFeedEpochNotMatchCounter = eventFeedErrorCounter.WithLabelValues("EpochNotMatch") + metricFeedRegionNotFoundCounter = eventFeedErrorCounter.WithLabelValues("RegionNotFound") + metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") + metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") + metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") + metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") + metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy") + metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested") +) + +type eventError struct { + err *cdcpb.Error +} + +// Error implement error interface. +func (e *eventError) Error() string { + return e.err.String() +} + +type rpcCtxUnavailableErr struct { + verID tikv.RegionVerID +} + +func (e *rpcCtxUnavailableErr) Error() string { + return fmt.Sprintf("cannot get rpcCtx for region %v. ver:%v, confver:%v", + e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) +} + +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } + +type sendRequestToStoreErr struct{} + +func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } + +>>>>>>> 4624acb2f2 (puller: fix retry logic when check store version failed (#11903)) // SubscriptionID comes from `SharedClient.AllocSubscriptionID`. type SubscriptionID uint64 @@ -624,6 +696,13 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo) metricFeedRPCCtxUnavailable.Inc() s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) return nil + case *getStoreErr: + metricGetStoreErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + // cannot get the store the region belongs to, so we need to reload the region. + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 3d006e01275..995386d0623 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/tidb/pkg/store/mockstore/mockcopr" "github.com/pingcap/tiflow/cdc/kv/regionlock" @@ -212,6 +213,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { } } +func TestGetStoreFailed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore1 := "localhost:1" + invalidStore2 := "localhost:2" + cluster.AddStore(1, addr1) + cluster.AddStore(2, invalidStore1) + cluster.AddStore(3, invalidStore2) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4) + + client := NewSharedClient( + model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{ + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 1, + GrpcStreamConcurrent: 1, + AdvanceIntervalInMs: 10, + }, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}, + }, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver, + ) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + server1.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`) + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + select { + case <-eventCh: + require.True(t, false, "should not get event when get store failed") + case <-time.After(5 * time.Second): + } + failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed") + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + type mockChangeDataServer struct { ch chan *cdcpb.ChangeDataEvent wg sync.WaitGroup diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index a2a3f4f6540..c314355bba3 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/pkg/chann" + cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -87,12 +88,31 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque if err := waitForPreFetching(); err != nil { return err } - if canceled := stream.run(ctx, c, r); canceled { - return nil + var regionErr error + if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil { + log.Info("event feed check store version fails", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr), + zap.Error(err)) + if errors.Cause(err) == context.Canceled { + return nil + } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} + } + } else { + if canceled := stream.run(ctx, c, r); canceled { + return nil + } + regionErr = &sendRequestToStoreErr{} } for _, m := range stream.clearStates() { for _, state := range m { - state.markStopped(&sendRequestToStoreErr{}) + state.markStopped(regionErr) sfEvent := newEventItem(nil, state, stream) slot := hashRegionID(state.sri.verID.GetID(), len(c.workers)) _ = c.workers[slot].sendEvent(ctx, sfEvent) @@ -105,7 +125,11 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque // It means it's a special task for stopping the table. continue } +<<<<<<< HEAD c.onRegionFail(newRegionErrorInfo(sri, &sendRequestToStoreErr{})) +======= + c.onRegionFail(newRegionErrorInfo(region, regionErr)) +>>>>>>> 4624acb2f2 (puller: fix retry logic when check store version failed (#11903)) } if err := util.Hang(ctx, time.Second); err != nil { return err @@ -132,17 +156,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste } } - if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil { - log.Info("event feed check store version fails", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Error(err)) - return isCanceled() - } - log.Info("event feed going to create grpc stream", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), @@ -339,7 +352,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request if s.multiplexing != nil { req := &cdcpb.ChangeDataRequest{ RequestId: uint64(subscriptionID), - Request: &cdcpb.ChangeDataRequest_Deregister_{}, + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, } if err = doSend(s.multiplexing, req, subscriptionID); err != nil { return err diff --git a/pkg/version/check.go b/pkg/version/check.go index 1ab4af6c1ab..043803354c1 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/util/engine" @@ -196,6 +197,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre // CheckStoreVersion checks whether the given TiKV is compatible with this CDC. // If storeID is 0, it checks all TiKV. func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) var stores []*metapb.Store var err error if storeID == 0 { From 8d46ed9a201157616f6e47d62344661effa0875d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 24 Dec 2024 14:11:13 +0800 Subject: [PATCH 2/4] fix conflict --- cdc/kv/client.go | 5 +++ cdc/kv/shared_client.go | 72 ----------------------------------------- cdc/kv/shared_stream.go | 6 +--- 3 files changed, 6 insertions(+), 77 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 9043946c86c..622c7e8e2f5 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -104,6 +104,7 @@ var ( metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") metricConnectToStoreErr = eventFeedErrorCounter.WithLabelValues("ConnectToStore") ) @@ -1492,3 +1493,7 @@ func (e *connectToStoreErr) Error() string { return "connect to store error" } type sendRequestToStoreErr struct{} func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } + +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 2a51804605d..994c533e89f 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -47,78 +47,6 @@ import ( "golang.org/x/sync/errgroup" ) -<<<<<<< HEAD -======= -const ( - // Maximum total sleep time(in ms), 20 seconds. - tikvRequestMaxBackoff = 20000 - - // TiCDC always interacts with region leader, every time something goes wrong, - // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we - // don't need to force reload region anymore. - regionScheduleReload = false - - scanRegionsConcurrency = 1024 - - loadRegionRetryInterval time.Duration = 100 * time.Millisecond - resolveLockMinInterval time.Duration = 10 * time.Second - invalidSubscriptionID SubscriptionID = SubscriptionID(0) -) - -var ( - // To generate an ID for a new subscription. And the subscription ID will also be used as - // `RequestId` in region requests of the table. - subscriptionIDGen atomic.Uint64 - // To generate a streamID in `newStream`. - streamIDGen atomic.Uint64 -) - -var ( - // unreachable error, only used in unit test - errUnreachable = errors.New("kv client unreachable error") - logPanic = log.Panic -) - -var ( - metricFeedNotLeaderCounter = eventFeedErrorCounter.WithLabelValues("NotLeader") - metricFeedEpochNotMatchCounter = eventFeedErrorCounter.WithLabelValues("EpochNotMatch") - metricFeedRegionNotFoundCounter = eventFeedErrorCounter.WithLabelValues("RegionNotFound") - metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") - metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") - metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") - metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") - metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") - metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy") - metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested") -) - -type eventError struct { - err *cdcpb.Error -} - -// Error implement error interface. -func (e *eventError) Error() string { - return e.err.String() -} - -type rpcCtxUnavailableErr struct { - verID tikv.RegionVerID -} - -func (e *rpcCtxUnavailableErr) Error() string { - return fmt.Sprintf("cannot get rpcCtx for region %v. ver:%v, confver:%v", - e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) -} - -type getStoreErr struct{} - -func (e *getStoreErr) Error() string { return "get store error" } - -type sendRequestToStoreErr struct{} - -func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } - ->>>>>>> 4624acb2f2 (puller: fix retry logic when check store version failed (#11903)) // SubscriptionID comes from `SharedClient.AllocSubscriptionID`. type SubscriptionID uint64 diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index c314355bba3..42296288919 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -125,11 +125,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque // It means it's a special task for stopping the table. continue } -<<<<<<< HEAD - c.onRegionFail(newRegionErrorInfo(sri, &sendRequestToStoreErr{})) -======= - c.onRegionFail(newRegionErrorInfo(region, regionErr)) ->>>>>>> 4624acb2f2 (puller: fix retry logic when check store version failed (#11903)) + c.onRegionFail(newRegionErrorInfo(sri, regionErr)) } if err := util.Hang(ctx, time.Second); err != nil { return err From e3fa0c7ff353d86232c5cfc668c41d7b2930b01d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 24 Dec 2024 14:32:56 +0800 Subject: [PATCH 3/4] fix build --- cdc/kv/client.go | 13 +++++++++++-- cdc/kv/shared_client.go | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 622c7e8e2f5..3f4bc23dea9 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -661,8 +661,17 @@ func (s *eventFeedSession) requestRegionToStore( time.Sleep(delay) } bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) - errInfo := newRegionErrorInfo(sri, &connectToStoreErr{}) + var regionErr error + var scheduleReload bool + if cerror.Is(err, cerror.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + scheduleReload = true + } else { + regionErr = &connectToStoreErr{} + scheduleReload = regionScheduleReload + } + s.client.regionCache.OnSendFail(bo, rpcCtx, scheduleReload, err) + errInfo := newRegionErrorInfo(sri, regionErr) s.onRegionFail(ctx, errInfo) continue } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 994c533e89f..7137e3840df 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -629,7 +629,7 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo) bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) // cannot get the store the region belongs to, so we need to reload the region. s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() From 188cc12c3d3dc5332c684c4e3970d270ada9f051 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 24 Dec 2024 17:26:53 +0800 Subject: [PATCH 4/4] fix test --- cdc/kv/shared_client.go | 7 +++++++ cdc/kv/shared_client_test.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 7137e3840df..60b3038cda4 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -464,6 +464,9 @@ func (s *SharedClient) divideAndScheduleRegions( bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) regions, err := s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.StartKey, nextSpan.EndKey, limit) if err != nil { + if errors.Cause(err) == context.Canceled { + return nil + } log.Warn("event feed load regions failed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -665,6 +668,10 @@ func (s *SharedClient) resolveLock(ctx context.Context) error { } doResolve := func(regionID uint64, state *regionlock.LockedRange, maxVersion uint64) { + if state == nil { + log.Warn("found nil state in resolve lock", zap.Uint64("regionID", regionID)) + return + } if state.ResolvedTs.Load() > maxVersion || !state.Initialzied.Load() { return } diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 995386d0623..496ea5c6ff6 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -250,7 +250,7 @@ func TestGetStoreFailed(t *testing.T) { GrpcStreamConcurrent: 1, AdvanceIntervalInMs: 10, }, - Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{}}, }, false, pdClient, grpcPool, regionCache, pdClock, lockResolver, )