From 3391adffdd029e02b525c74bb3abeaa41cc233f1 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Fri, 26 Nov 2021 15:53:51 +0800 Subject: [PATCH] This is an automated cherry-pick of #3464 Signed-off-by: ti-chi-bot --- cdc/capture/capture.go | 42 ++++++ cdc/kv/client.go | 12 +- cdc/kv/client_bench_test.go | 10 +- cdc/kv/client_test.go | 219 ++++++++++++++++++++++++------- cdc/kv/testing.go | 9 +- cdc/owner/ddl_puller.go | 2 +- cdc/processor/pipeline/puller.go | 5 + cdc/processor/processor.go | 1 + cdc/puller/puller.go | 5 +- cdc/puller/puller_test.go | 5 +- pkg/context/context.go | 12 ++ 11 files changed, 248 insertions(+), 74 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index c6a174ee6c6..5b2a735b973 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/ticdc/pkg/orchestrator" "github.com/pingcap/ticdc/pkg/version" tidbkv "github.com/pingcap/tidb/kv" + "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/mvcc" @@ -54,10 +55,21 @@ type Capture struct { session *concurrency.Session election *concurrency.Election +<<<<<<< HEAD pdClient pd.Client kvStorage tidbkv.Storage etcdClient *kv.CDCEtcdClient grpcPool kv.GrpcPool +======= + pdClient pd.Client + kvStorage tidbkv.Storage + etcdClient *etcd.CDCEtcdClient + grpcPool kv.GrpcPool + regionCache *tikv.RegionCache + TimeAcquirer pdtime.TimeAcquirer + + tableActorSystem *system.System +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) cancel context.CancelFunc @@ -103,6 +115,10 @@ func (c *Capture) reset(ctx context.Context) error { c.grpcPool.Close() } c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security) + if c.regionCache != nil { + c.regionCache.Close() + } + c.regionCache = tikv.NewRegionCache(c.pdClient) log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr)) return nil } @@ -147,11 +163,22 @@ func (c *Capture) Run(ctx context.Context) error { func (c *Capture) run(stdCtx context.Context) error { ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ +<<<<<<< HEAD PDClient: c.pdClient, KVStorage: c.kvStorage, CaptureInfo: c.info, EtcdClient: c.etcdClient, GrpcPool: c.grpcPool, +======= + PDClient: c.pdClient, + KVStorage: c.kvStorage, + CaptureInfo: c.info, + EtcdClient: c.etcdClient, + GrpcPool: c.grpcPool, + RegionCache: c.regionCache, + TimeAcquirer: c.TimeAcquirer, + TableActorSystem: c.tableActorSystem, +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) }) err := c.register(ctx) if err != nil { @@ -336,6 +363,7 @@ func (c *Capture) register(ctx cdcContext.Context) error { } // AsyncClose closes the capture by unregistering it from etcd +// Note: this function should be reentrant func (c *Capture) AsyncClose() { defer c.cancel() // Safety: Here we mainly want to stop the owner @@ -352,6 +380,20 @@ func (c *Capture) AsyncClose() { if c.grpcPool != nil { c.grpcPool.Close() } +<<<<<<< HEAD +======= + if c.regionCache != nil { + c.regionCache.Close() + c.regionCache = nil + } + if c.tableActorSystem != nil { + err := c.tableActorSystem.Stop() + if err != nil { + log.Warn("stop table actor system failed", zap.Error(err)) + } + c.tableActorSystem = nil + } +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) } // WriteDebugInfo writes the debug info into writer. diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 2274b3587f1..8837af8756b 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -282,7 +282,6 @@ type CDCKVClient interface { isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, ) error - Close() error } // NewCDCKVClient is the constructor of CDC KV client @@ -308,7 +307,7 @@ type CDCClient struct { } // NewCDCClient creates a CDCClient instance -func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) { +func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) { clusterID := pd.GetClusterID(ctx) var store TiKVStorage @@ -326,19 +325,12 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp pd: pd, kvStorage: store, grpcPool: grpcPool, - regionCache: tikv.NewRegionCache(pd), + regionCache: regionCache, regionLimiters: defaultRegionEventFeedLimiters, } return } -// Close CDCClient -func (c *CDCClient) Close() error { - c.regionCache.Close() - - return nil -} - func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter { return c.regionLimiters.getLimiter(regionID) } diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 9f0554c88e6..1e012b72c2d 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -190,7 +190,9 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -198,7 +200,6 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( if errors.Cause(err) != context.Canceled { b.Error(err) } - cdcClient.Close() //nolint:errcheck wg.Done() }() @@ -280,7 +281,9 @@ func prepareBench(b *testing.B, regionNum int) ( isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -288,7 +291,6 @@ func prepareBench(b *testing.B, regionNum int) ( if errors.Cause(err) != context.Canceled { b.Error(err) } - cdcClient.Close() //nolint:errcheck wg.Done() }() diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index fb82bba833a..5f87abd4874 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -65,7 +65,22 @@ type clientSuite struct { var _ = check.Suite(&clientSuite{}) +<<<<<<< HEAD func (s *clientSuite) TestNewClose(c *check.C) { +======= +func (s *clientSuite) SetUpTest(c *check.C) { + dir := c.MkDir() + var err error + _, s.e, err = etcd.SetupEmbedEtcd(dir) + c.Assert(err, check.IsNil) +} + +func (s *clientSuite) TearDownTest(c *check.C) { + s.e.Close() +} + +func (s *clientSuite) TestNewClient(c *check.C) { +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) defer testleak.AfterTest(c)() rpcClient, _, pdClient, err := testutils.NewMockTiKV("", nil) c.Assert(err, check.IsNil) @@ -74,9 +89,10 @@ func (s *clientSuite) TestNewClose(c *check.C) { grpcPool := NewGrpcPoolImpl(context.Background(), &security.Credential{}) defer grpcPool.Close() - cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool) - err = cli.Close() - c.Assert(err, check.IsNil) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache) + c.Assert(cli, check.NotNil) } func (s *clientSuite) TestAssembleRowEvent(c *check.C) { @@ -335,8 +351,9 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool) - defer cdcClient.Close() //nolint:errcheck + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { @@ -434,14 +451,15 @@ func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // new session, new request @@ -532,14 +550,15 @@ func (s *etcdSuite) TestHandleError(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -689,7 +708,9 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) var wg2 sync.WaitGroup wg2.Add(1) @@ -697,7 +718,6 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(cerror.ErrVersionIncompatible.Equal(err), check.IsTrue) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -720,7 +740,84 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) { cancel() } +<<<<<<< HEAD func (s *etcdSuite) testHandleFeedEvent(c *check.C) { +======= +// TestClusterIDMismatch tests kv client returns an error when TiKV returns +// the cluster ID mismatch error. +func (s *clientSuite) TestClusterIDMismatch(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + changeDataCh := make(chan *cdcpb.ChangeDataEvent, 10) + changeDataService := newMockChangeDataService(c, changeDataCh) + mockService, addr := newMockService(ctx, c, changeDataService, wg) + defer func() { + close(changeDataCh) + mockService.Stop() + wg.Wait() + }() + + rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + c.Assert(err, check.IsNil) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + + kvStorage := newStorageWithCurVersionCache(tiStore, addr) + defer kvStorage.Close() //nolint:errcheck + + cluster.AddStore(1, addr) + cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) + + baseAllocatedID := currentRequestID() + lockResolver := txnutil.NewLockerResolver(kvStorage) + isPullInit := &mockPullerInit{} + + grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) + defer grpcPool.Close() + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + eventCh := make(chan model.RegionFeedEvent, 10) + + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + defer wg2.Done() + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) + c.Assert(cerror.ErrClusterIDMismatch.Equal(err), check.IsTrue) + }() + + // wait request id allocated with: new session, new request + waitRequestID(c, baseAllocatedID+1) + clusterIDMismatchEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: 3, + RequestId: currentRequestID(), + Event: &cdcpb.Event_Error{ + Error: &cdcpb.Error{ + ClusterIdMismatch: &cdcpb.ClusterIDMismatch{ + Current: 0, + Request: 1, + }, + }, + }, + }, + }} + + changeDataCh <- clusterIDMismatchEvent + + wg2.Wait() + cancel() +} + +func (s *clientSuite) testHandleFeedEvent(c *check.C) { +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) defer s.TearDownTest(c) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -751,14 +848,15 @@ func (s *etcdSuite) testHandleFeedEvent(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1200,14 +1298,15 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockerResolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() var requestIds sync.Map @@ -1310,14 +1409,15 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 40) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1439,7 +1539,9 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { @@ -1447,7 +1549,6 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { defer close(eventCh) err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1651,14 +1752,15 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() err = retry.Do(context.Background(), func() error { @@ -1726,7 +1828,9 @@ func (s *etcdSuite) TestNoPendingRegionError(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) @@ -1734,7 +1838,6 @@ func (s *etcdSuite) TestNoPendingRegionError(c *check.C) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1803,14 +1906,15 @@ func (s *etcdSuite) TestDropStaleRequest(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1912,14 +2016,15 @@ func (s *etcdSuite) TestResolveLock(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2011,7 +2116,9 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2019,7 +2126,6 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(err, check.Equals, errUnreachable) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2159,14 +2265,15 @@ func (s *etcdSuite) testEventAfterFeedStop(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2337,14 +2444,15 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2567,14 +2675,15 @@ func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2661,14 +2770,15 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2742,14 +2852,15 @@ func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait the second region is scheduled @@ -2808,14 +2919,15 @@ func (s *etcdSuite) testClientErrNoPendingRegion(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() baseAllocatedID := currentRequestID() @@ -2884,14 +2996,15 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() baseAllocatedID := currentRequestID() @@ -3033,14 +3146,15 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // the kv client is blocked by failpoint injection, and after region has split @@ -3149,14 +3263,15 @@ func (s *etcdSuite) TestEvTimeUpdate(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -3268,14 +3383,15 @@ func (s *etcdSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -3359,7 +3475,9 @@ func (s *etcdSuite) TestPrewriteNotMatchError(c *check.C) { lockResolver := txnutil.NewLockerResolver(kvStorage) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) baseAllocatedID := currentRequestID() @@ -3368,7 +3486,6 @@ func (s *etcdSuite) TestPrewriteNotMatchError(c *check.C) { defer wg.Done() err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockResolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // The expected request ids are agnostic because the kv client could retry diff --git a/cdc/kv/testing.go b/cdc/kv/testing.go index b28fe933563..4c68fa9a1fd 100644 --- a/cdc/kv/testing.go +++ b/cdc/kv/testing.go @@ -150,8 +150,9 @@ func TestSplit(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStor defer cancel() grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool) - defer cli.Close() + regionCache := tikv.NewRegionCache(pdCli) + + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache) startTS := mustGetTimestamp(t, storage) @@ -240,8 +241,8 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, defer cancel() grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool) - defer cli.Close() + regionCache := tikv.NewRegionCache(pdCli) + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache) startTS := mustGetTimestamp(t, storage) lockresolver := txnutil.NewLockerResolver(storage) diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 49ff27e8243..c4a17400f47 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -65,7 +65,7 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { kvStorage := ctx.GlobalVars().KVStorage // kvStorage can be nil only in the test if kvStorage != nil { - plr = puller.NewPuller(ctx, pdCli, ctx.GlobalVars().GrpcPool, kvStorage, startTs, + plr = puller.NewPuller(ctx, pdCli, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, kvStorage, startTs, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false) } diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 1a8d3492406..367d1e2f9e8 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -64,8 +64,13 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error { ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) // NOTICE: always pull the old value internally +<<<<<<< HEAD // See also: TODO(hi-rustin): add issue link here. plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().KVStorage, +======= + // See also: https://github.com/pingcap/ticdc/issues/2301. + plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) n.replicaInfo.StartTs, n.tableSpan(ctx), true) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index a69bb16502f..be5f6bac6bf 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -442,6 +442,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S stdCtx, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, + ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, checkpointTs, ddlspans, false) meta, err := kv.GetSnapshotMeta(kvStorage, checkpointTs) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 7e6c8d3a09a..4fc03ca0f0f 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -70,6 +70,7 @@ func NewPuller( ctx context.Context, pdCli pd.Client, grpcPool kv.GrpcPool, + regionCache *tikv.RegionCache, kvStorage tidbkv.Storage, checkpointTs uint64, spans []regionspan.Span, @@ -87,7 +88,7 @@ func NewPuller( // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. tsTracker := frontier.NewFrontier(0, comparableSpans...) - kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool) + kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache) p := &pullerImpl{ pdCli: pdCli, kvCli: kvCli, @@ -109,8 +110,6 @@ func (p *pullerImpl) Output() <-chan *model.RawKVEntry { // Run the puller, continually fetch event from TiKV and add event into buffer func (p *pullerImpl) Run(ctx context.Context) error { - defer p.kvCli.Close() - g, ctx := errgroup.WithContext(ctx) checkpointTs := p.checkpointTs diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 667a1ef5e1a..b6362e57d97 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -63,6 +63,7 @@ func newMockCDCKVClient( pd pd.Client, kvStorage tikv.Storage, grpcPool kv.GrpcPool, + regionCache *tikv.RegionCache, ) kv.CDCKVClient { return &mockCDCKVClient{ expectations: make(chan model.RegionFeedEvent, 1024), @@ -125,7 +126,9 @@ func (s *pullerSuite) newPullerForTest( pdCli := &mockPdClientForPullerTest{clusterID: uint64(1)} grpcPool := kv.NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - plr := NewPuller(ctx, pdCli, grpcPool, store, checkpointTs, spans, enableOldValue) + regionCache := tikv.NewRegionCache(pdCli) + defer regionCache.Close() + plr := NewPuller(ctx, pdCli, grpcPool, regionCache, store, checkpointTs, spans, enableOldValue) wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/context/context.go b/pkg/context/context.go index 21f99ffbeba..68520513fdf 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" tidbkv "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -33,11 +34,22 @@ import ( // the lifecycle of vars in the GlobalVars should be aligned with the ticdc server process. // All field in Vars should be READ-ONLY and THREAD-SAFE type GlobalVars struct { +<<<<<<< HEAD PDClient pd.Client KVStorage tidbkv.Storage CaptureInfo *model.CaptureInfo EtcdClient *kv.CDCEtcdClient GrpcPool kv.GrpcPool +======= + PDClient pd.Client + KVStorage tidbkv.Storage + CaptureInfo *model.CaptureInfo + EtcdClient *etcd.CDCEtcdClient + GrpcPool kv.GrpcPool + RegionCache *tikv.RegionCache + TimeAcquirer pdtime.TimeAcquirer + TableActorSystem *system.System +>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464)) } // ChangefeedVars contains some vars which can be used anywhere in a pipeline