Skip to content

Commit

Permalink
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Nov 26, 2021
2 parents fc0d687 + e46ded9 commit 4008fce
Show file tree
Hide file tree
Showing 44 changed files with 649 additions and 215 deletions.
12 changes: 12 additions & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand All @@ -61,6 +62,7 @@ type Capture struct {
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
TimeAcquirer pdtime.TimeAcquirer

tableActorSystem *system.System
Expand Down Expand Up @@ -128,6 +130,10 @@ func (c *Capture) reset(ctx context.Context) error {
}
}
c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.pdClient)
log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr))
return nil
}
Expand Down Expand Up @@ -177,6 +183,7 @@ func (c *Capture) run(stdCtx context.Context) error {
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
TimeAcquirer: c.TimeAcquirer,
TableActorSystem: c.tableActorSystem,
})
Expand Down Expand Up @@ -368,6 +375,7 @@ func (c *Capture) register(ctx cdcContext.Context) error {
}

// AsyncClose closes the capture by unregistering it from etcd
// Note: this function should be reentrant
func (c *Capture) AsyncClose() {
defer c.cancel()
// Safety: Here we mainly want to stop the owner
Expand All @@ -384,6 +392,10 @@ func (c *Capture) AsyncClose() {
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.regionCache != nil {
c.regionCache.Close()
c.regionCache = nil
}
if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
if err != nil {
Expand Down
12 changes: 2 additions & 10 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ type CDCKVClient interface {
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
) error
Close() error
}

// NewCDCKVClient is the constructor of CDC KV client
Expand All @@ -303,7 +302,7 @@ type CDCClient struct {
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) {
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -321,19 +320,12 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp
pd: pd,
kvStorage: store,
grpcPool: grpcPool,
regionCache: tikv.NewRegionCache(pd),
regionCache: regionCache,
regionLimiters: defaultRegionEventFeedLimiters,
}
return
}

// Close CDCClient
func (c *CDCClient) Close() error {
c.regionCache.Close()

return nil
}

func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter {
return c.regionLimiters.getLimiter(regionID)
}
Expand Down
10 changes: 6 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,16 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

Expand Down Expand Up @@ -280,15 +281,16 @@ func prepareBench(b *testing.B, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

Expand Down
Loading

0 comments on commit 4008fce

Please sign in to comment.