diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index af34feae98c..d06749f452e 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/version" tidbkv "github.com/pingcap/tidb/kv" + "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/mvcc" @@ -61,6 +62,7 @@ type Capture struct { kvStorage tidbkv.Storage etcdClient *etcd.CDCEtcdClient grpcPool kv.GrpcPool + regionCache *tikv.RegionCache TimeAcquirer pdtime.TimeAcquirer tableActorSystem *system.System @@ -128,6 +130,10 @@ func (c *Capture) reset(ctx context.Context) error { } } c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security) + if c.regionCache != nil { + c.regionCache.Close() + } + c.regionCache = tikv.NewRegionCache(c.pdClient) log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr)) return nil } @@ -177,6 +183,7 @@ func (c *Capture) run(stdCtx context.Context) error { CaptureInfo: c.info, EtcdClient: c.etcdClient, GrpcPool: c.grpcPool, + RegionCache: c.regionCache, TimeAcquirer: c.TimeAcquirer, TableActorSystem: c.tableActorSystem, }) @@ -368,6 +375,7 @@ func (c *Capture) register(ctx cdcContext.Context) error { } // AsyncClose closes the capture by unregistering it from etcd +// Note: this function should be reentrant func (c *Capture) AsyncClose() { defer c.cancel() // Safety: Here we mainly want to stop the owner @@ -384,6 +392,10 @@ func (c *Capture) AsyncClose() { if c.grpcPool != nil { c.grpcPool.Close() } + if c.regionCache != nil { + c.regionCache.Close() + c.regionCache = nil + } if c.tableActorSystem != nil { err := c.tableActorSystem.Stop() if err != nil { diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 23c6c78e90b..cdef9f7feca 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -282,7 +282,6 @@ type CDCKVClient interface { isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, ) error - Close() error } // NewCDCKVClient is the constructor of CDC KV client @@ -303,7 +302,7 @@ type CDCClient struct { } // NewCDCClient creates a CDCClient instance -func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) { +func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) { clusterID := pd.GetClusterID(ctx) var store TiKVStorage @@ -321,19 +320,12 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp pd: pd, kvStorage: store, grpcPool: grpcPool, - regionCache: tikv.NewRegionCache(pd), + regionCache: regionCache, regionLimiters: defaultRegionEventFeedLimiters, } return } -// Close CDCClient -func (c *CDCClient) Close() error { - c.regionCache.Close() - - return nil -} - func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter { return c.regionLimiters.getLimiter(regionID) } diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 9f0554c88e6..1e012b72c2d 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -190,7 +190,9 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -198,7 +200,6 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( if errors.Cause(err) != context.Canceled { b.Error(err) } - cdcClient.Close() //nolint:errcheck wg.Done() }() @@ -280,7 +281,9 @@ func prepareBench(b *testing.B, regionNum int) ( isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -288,7 +291,6 @@ func prepareBench(b *testing.B, regionNum int) ( if errors.Cause(err) != context.Canceled { b.Error(err) } - cdcClient.Close() //nolint:errcheck wg.Done() }() diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 5917dbfb17c..1b2b41fec3a 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -79,7 +79,7 @@ func (s *clientSuite) TearDownTest(c *check.C) { s.e.Close() } -func (s *clientSuite) TestNewClose(c *check.C) { +func (s *clientSuite) TestNewClient(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) rpcClient, _, pdClient, err := testutils.NewMockTiKV("", nil) @@ -89,9 +89,10 @@ func (s *clientSuite) TestNewClose(c *check.C) { grpcPool := NewGrpcPoolImpl(context.Background(), &security.Credential{}) defer grpcPool.Close() - cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool) - err = cli.Close() - c.Assert(err, check.IsNil) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache) + c.Assert(cli, check.NotNil) } func (s *clientSuite) TestAssembleRowEvent(c *check.C) { @@ -350,8 +351,9 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool) - defer cdcClient.Close() //nolint:errcheck + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { @@ -449,14 +451,15 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // new session, new request @@ -547,14 +550,15 @@ func (s *clientSuite) TestHandleError(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -704,7 +708,9 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) var wg2 sync.WaitGroup wg2.Add(1) @@ -712,7 +718,6 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) c.Assert(cerror.ErrVersionIncompatible.Equal(err), check.IsTrue) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -772,8 +777,9 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) var wg2 sync.WaitGroup @@ -782,7 +788,6 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) c.Assert(cerror.ErrClusterIDMismatch.Equal(err), check.IsTrue) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -839,14 +844,15 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1288,14 +1294,15 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockerResolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() var requestIds sync.Map @@ -1398,14 +1405,15 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 40) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1527,7 +1535,9 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { @@ -1535,7 +1545,6 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { defer close(eventCh) err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1739,14 +1748,15 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() err = retry.Do(context.Background(), func() error { @@ -1814,7 +1824,9 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) @@ -1822,7 +1834,6 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -1891,14 +1902,15 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2000,14 +2012,15 @@ func (s *clientSuite) TestResolveLock(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2099,7 +2112,9 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2107,7 +2122,6 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(err, check.Equals, errUnreachable) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2247,14 +2261,15 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2426,14 +2441,15 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2657,14 +2673,15 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2751,14 +2768,15 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -2832,14 +2850,15 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait the second region is scheduled @@ -2898,14 +2917,15 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() baseAllocatedID := currentRequestID() @@ -2974,14 +2994,15 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() baseAllocatedID := currentRequestID() @@ -3123,14 +3144,15 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // the kv client is blocked by failpoint injection, and after region has split @@ -3238,14 +3260,15 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -3357,14 +3380,15 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // wait request id allocated with: new session, new request @@ -3448,7 +3472,9 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { lockResolver := txnutil.NewLockerResolver(kvStorage) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) eventCh := make(chan model.RegionFeedEvent, 10) baseAllocatedID := currentRequestID() @@ -3457,7 +3483,6 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { defer wg.Done() err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockResolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - cdcClient.Close() //nolint:errcheck }() // The expected request ids are agnostic because the kv client could retry diff --git a/cdc/kv/testing.go b/cdc/kv/testing.go index b28fe933563..4c68fa9a1fd 100644 --- a/cdc/kv/testing.go +++ b/cdc/kv/testing.go @@ -150,8 +150,9 @@ func TestSplit(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStor defer cancel() grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool) - defer cli.Close() + regionCache := tikv.NewRegionCache(pdCli) + + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache) startTS := mustGetTimestamp(t, storage) @@ -240,8 +241,8 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, defer cancel() grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool) - defer cli.Close() + regionCache := tikv.NewRegionCache(pdCli) + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache) startTS := mustGetTimestamp(t, storage) lockresolver := txnutil.NewLockerResolver(storage) diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 95c3e9b13ae..af0a45aacba 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -66,7 +66,7 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { kvStorage := ctx.GlobalVars().KVStorage // kvStorage can be nil only in the test if kvStorage != nil { - plr = puller.NewPuller(ctx, pdCli, ctx.GlobalVars().GrpcPool, kvStorage, startTs, + plr = puller.NewPuller(ctx, pdCli, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, kvStorage, startTs, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false) } diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 1cbedeeea30..447dcd4a96d 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -65,7 +65,7 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error { ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) // NOTICE: always pull the old value internally // See also: https://github.com/pingcap/ticdc/issues/2301. - plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().KVStorage, + plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, n.replicaInfo.StartTs, n.tableSpan(ctx), true) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 36787e3d70b..997e8570f69 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -456,6 +456,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S stdCtx, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, + ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, checkpointTs, ddlspans, false) meta, err := kv.GetSnapshotMeta(kvStorage, checkpointTs) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 9b0fb6ec473..4b39862b2c9 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -69,6 +69,7 @@ func NewPuller( ctx context.Context, pdCli pd.Client, grpcPool kv.GrpcPool, + regionCache *tikv.RegionCache, kvStorage tidbkv.Storage, checkpointTs uint64, spans []regionspan.Span, @@ -86,7 +87,7 @@ func NewPuller( // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. tsTracker := frontier.NewFrontier(0, comparableSpans...) - kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool) + kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache) p := &pullerImpl{ kvCli: kvCli, kvStorage: tikvStorage, @@ -103,8 +104,6 @@ func NewPuller( // Run the puller, continually fetch event from TiKV and add event into buffer func (p *pullerImpl) Run(ctx context.Context) error { - defer p.kvCli.Close() - g, ctx := errgroup.WithContext(ctx) checkpointTs := p.checkpointTs diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 4b34057d164..8e949f160a6 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -62,6 +62,7 @@ func newMockCDCKVClient( pd pd.Client, kvStorage tikv.Storage, grpcPool kv.GrpcPool, + regionCache *tikv.RegionCache, ) kv.CDCKVClient { return &mockCDCKVClient{ expectations: make(chan model.RegionFeedEvent, 1024), @@ -124,7 +125,9 @@ func (s *pullerSuite) newPullerForTest( pdCli := &mockPdClientForPullerTest{clusterID: uint64(1)} grpcPool := kv.NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() - plr := NewPuller(ctx, pdCli, grpcPool, store, checkpointTs, spans, enableOldValue) + regionCache := tikv.NewRegionCache(pdCli) + defer regionCache.Close() + plr := NewPuller(ctx, pdCli, grpcPool, regionCache, store, checkpointTs, spans, enableOldValue) wg.Add(1) go func() { defer wg.Done() diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index c81497876da..d5080afee5d 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -364,7 +364,7 @@ ErrMasterPartWorkerExecDDLFail,[code=38025:class=dm-master:scope=internal:level= ErrMasterWorkerExistDDLLock,[code=38026:class=dm-master:scope=internal:level=high], "Message: worker %s exist ddl lock, Workaround: Please unlock ddl lock first." ErrMasterGetWorkerCfgExtractor,[code=38027:class=dm-master:scope=internal:level=high] ErrMasterTaskConfigExtractor,[code=38028:class=dm-master:scope=internal:level=high] -ErrMasterWorkerArgsExtractor,[code=38029:class=dm-master:scope=internal:level=high] +ErrMasterWorkerArgsExtractor,[code=38029:class=dm-master:scope=internal:level=high], "Workaround: Please use list-member command to see if the some workers are offline." ErrMasterQueryWorkerConfig,[code=38030:class=dm-master:scope=internal:level=high] ErrMasterOperNotFound,[code=38031:class=dm-master:scope=internal:level=high], "Message: operation %d of task %s on worker %s not found, Workaround: Please execute `query-status` to check status." ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=high], "Message: some error occurs in dm-worker: %s, Workaround: Please execute `query-status` to check status." @@ -526,6 +526,10 @@ ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "M ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update relay-log related parts for now" ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]" ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source." +ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`." +ErrSchedulerStopRelayOnSpecified,[code=46029:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now, Workaround: Please specify worker names for `stop-relay`." +ErrSchedulerStartRelayOnBound,[code=46030:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now, Workaround: Please stop relay by `stop-relay` without worker name first." +ErrSchedulerStopRelayOnBound,[code=46031:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now, Workaround: Please use `stop-relay` without worker name." ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection." ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line." ErrCtlLoadTLSCfg,[code=48003:class=dmctl:scope=internal:level=high], "Message: can not load tls config, Workaround: Please ensure that the tls certificate is accessible on the node currently running dmctl." diff --git a/dm/dm/ctl/master/start_stop_relay.go b/dm/dm/ctl/master/start_stop_relay.go index 8cd101fce8b..21925af46c5 100644 --- a/dm/dm/ctl/master/start_stop_relay.go +++ b/dm/dm/ctl/master/start_stop_relay.go @@ -58,17 +58,14 @@ func startStopRelay(cmd *cobra.Command, op pb.RelayOpV2) error { return err } - if len(cmd.Flags().Args()) == 0 { + if len(cmd.Flags().Args()) == 0 && len(sources) == 0 { + // all args empty cmd.SetOut(os.Stdout) - if len(sources) == 0 { - // all args empty - common.PrintCmdUsage(cmd) - } else { - common.PrintLinesf("must specify at least one worker") - } + common.PrintCmdUsage(cmd) return errors.New("please check output to see error") } + // TODO: support multiple sources and all sources if len(sources) != 1 { common.PrintLinesf("must specify one source (`-s` / `--source`)") return errors.New("please check output to see error") diff --git a/dm/dm/master/openapi.go b/dm/dm/master/openapi.go index ddefb41eea5..807fb8d2cd8 100644 --- a/dm/dm/master/openapi.go +++ b/dm/dm/master/openapi.go @@ -199,7 +199,7 @@ func (s *Server) DMAPIGetSourceList(c *gin.Context, params openapi.DMAPIGetSourc if params.WithStatus != nil && *params.WithStatus { nexCtx := c.Request.Context() for idx := range sourceList { - sourceStatusList, err := s.getSourceStatusListFromWorker(nexCtx, sourceList[idx].SourceName) + sourceStatusList, err := s.getSourceStatusListFromWorker(nexCtx, sourceList[idx].SourceName, true) if err != nil { _ = c.Error(err) return @@ -340,8 +340,8 @@ func (s *Server) DMAPIGetSourceTableList(c *gin.Context, sourceName string, sche c.IndentedJSON(http.StatusOK, tableList) } -func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName string) ([]openapi.SourceStatus, error) { - workerStatusList := s.getStatusFromWorkers(ctx, []string{sourceName}, "", true) +func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName string, specifiedSource bool) ([]openapi.SourceStatus, error) { + workerStatusList := s.getStatusFromWorkers(ctx, []string{sourceName}, "", specifiedSource) sourceStatusList := make([]openapi.SourceStatus, len(workerStatusList)) for i, workerStatus := range workerStatusList { if workerStatus == nil { @@ -382,7 +382,7 @@ func (s *Server) DMAPIGetSourceStatus(c *gin.Context, sourceName string) { c.IndentedJSON(http.StatusOK, resp) return } - sourceStatusList, err := s.getSourceStatusListFromWorker(c.Request.Context(), sourceName) + sourceStatusList, err := s.getSourceStatusListFromWorker(c.Request.Context(), sourceName, true) if err != nil { _ = c.Error(err) return @@ -531,18 +531,22 @@ func (s *Server) DMAPIGetTaskList(c *gin.Context) { // DMAPIGetTaskStatus url is:(GET /api/v1/tasks/{task-name}/status). func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params openapi.DMAPIGetTaskStatusParams) { // 1. get task source list from scheduler - var sourceList []string + var ( + sourceList []string + specifiedSource bool + ) if params.SourceNameList == nil { sourceList = s.getTaskResources(taskName) } else { sourceList = *params.SourceNameList + specifiedSource = true } if len(sourceList) == 0 { _ = c.Error(terror.ErrSchedulerTaskNotExist.Generate(taskName)) return } // 2. get status from workers - workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, taskName, true) + workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, taskName, specifiedSource) subTaskStatusList := make([]openapi.SubTaskStatus, len(workerStatusList)) for i, workerStatus := range workerStatusList { if workerStatus == nil || workerStatus.SourceStatus == nil { diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index 3d2a7bca6fc..5542ff87dc7 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/ha" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" + "github.com/pingcap/ticdc/dm/pkg/utils" ) // Scheduler schedules tasks for DM-worker instances, including: @@ -59,6 +60,25 @@ import ( // - remove source request from user. // TODO: try to handle the return `err` of etcd operations, // because may put into etcd, but the response to the etcd client interrupted. +// Relay scheduling: +// - scheduled by source +// DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay` +// of source config and decide whether to enable relay. +// turn on `enable-relay`: +// - use `enable-relay: true` when create source +// - `start-relay -s source` to dynamically change `enable-relay` +// turn off `enable-relay`: +// - use `enable-relay: false` when create source +// - `stop-relay -s source` to dynamically change `enable-relay` +// - found conflict schedule type with (source, worker) when scheduler bootstrap +// - scheduled by (source, worker) +// DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will +// read UpstreamRelayWorkerKeyAdapter in etcd. +// add UpstreamRelayWorkerKeyAdapter: +// - use `start-relay -s source -w worker` +// remove UpstreamRelayWorkerKeyAdapter: +// - use `stop-relay -s source -w worker` +// - remove worker by `offline-member` type Scheduler struct { mu sync.RWMutex @@ -892,6 +912,7 @@ func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTask // GetSubTaskCfgs gets all subconfig, return nil when error happens. func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig { + // taskName -> sourceName -> SubTaskConfig clone := make(map[string]map[string]config.SubTaskConfig) s.subTaskCfgs.Range(func(k, v interface{}) bool { task := k.(string) @@ -1052,10 +1073,37 @@ func (s *Scheduler) StartRelay(source string, workers []string) error { } // 1. precheck - if _, ok := s.sourceCfgs[source]; !ok { + sourceCfg, ok := s.sourceCfgs[source] + if !ok { return terror.ErrSchedulerSourceCfgNotExist.Generate(source) } startedWorkers := s.relayWorkers[source] + + // quick path for `start-relay` without worker name + if len(workers) == 0 { + if len(startedWorkers) != 0 { + return terror.ErrSchedulerStartRelayOnSpecified.Generate(utils.SetToSlice(startedWorkers)) + } + // update enable-relay in source config + sourceCfg.EnableRelay = true + _, err := ha.PutSourceCfg(s.etcdCli, sourceCfg) + if err != nil { + return err + } + s.sourceCfgs[source] = sourceCfg + // notify bound worker + w, ok2 := s.bounds[source] + if !ok2 { + return nil + } + stage := ha.NewRelayStage(pb.Stage_Running, source) + _, err = ha.PutRelayStageSourceBound(s.etcdCli, stage, w.Bound()) + return err + } else if sourceCfg.EnableRelay { + // error when `enable-relay` and `start-relay` with worker name + return terror.ErrSchedulerStartRelayOnBound.Generate() + } + if startedWorkers == nil { startedWorkers = map[string]struct{}{} s.relayWorkers[source] = startedWorkers @@ -1139,9 +1187,37 @@ func (s *Scheduler) StopRelay(source string, workers []string) error { } // 1. precheck - if _, ok := s.sourceCfgs[source]; !ok { + sourceCfg, ok := s.sourceCfgs[source] + if !ok { return terror.ErrSchedulerSourceCfgNotExist.Generate(source) } + + // quick path for `stop-relay` without worker name + if len(workers) == 0 { + startedWorker := s.relayWorkers[source] + if len(startedWorker) != 0 { + return terror.ErrSchedulerStopRelayOnSpecified.Generate(utils.SetToSlice(startedWorker)) + } + // update enable-relay in source config + sourceCfg.EnableRelay = false + _, err := ha.PutSourceCfg(s.etcdCli, sourceCfg) + if err != nil { + return err + } + s.sourceCfgs[source] = sourceCfg + // notify bound worker + w, ok2 := s.bounds[source] + if !ok2 { + return nil + } + // TODO: remove orphan relay stage + _, err = ha.PutSourceBound(s.etcdCli, w.Bound()) + return err + } else if sourceCfg.EnableRelay { + // error when `enable-relay` and `stop-relay` with worker name + return terror.ErrSchedulerStopRelayOnBound.Generate() + } + var ( notExistWorkers []string unmatchedWorkers, unmatchedSources []string @@ -1469,11 +1545,31 @@ func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error { } // recoverRelayConfigs recovers history relay configs for each worker from etcd. +// This function also removes conflicting relay schedule types, which means if a source has both `enable-relay` and +// (source, worker) relay config, we remove the latter. +// should be called after recoverSources. func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error { relayWorkers, _, err := ha.GetAllRelayConfig(cli) if err != nil { return err } + + for source, workers := range relayWorkers { + sourceCfg, ok := s.sourceCfgs[source] + if !ok { + s.logger.Warn("found a not existing source by relay config", zap.String("source", source)) + continue + } + if sourceCfg.EnableRelay { + // current etcd max-txn-op is 2048 + _, err2 := ha.DeleteRelayConfig(cli, utils.SetToSlice(workers)...) + if err2 != nil { + return err2 + } + delete(relayWorkers, source) + } + } + s.relayWorkers = relayWorkers return nil } diff --git a/dm/dm/master/scheduler/scheduler_test.go b/dm/dm/master/scheduler/scheduler_test.go index ec2d5c116de..3e393c224fc 100644 --- a/dm/dm/master/scheduler/scheduler_test.go +++ b/dm/dm/master/scheduler/scheduler_test.go @@ -1353,6 +1353,82 @@ func (t *testScheduler) TestStartStopRelay(c *C) { c.Assert(bound, IsFalse) } +func (t *testScheduler) TestRelayWithWithoutWorker(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + ) + + worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}} + worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}} + + // step 1: start an empty scheduler + s.started = true + s.etcdCli = etcdTestCli + s.workers[workerName1] = worker1 + s.workers[workerName2] = worker2 + s.sourceCfgs[sourceID1] = &config.SourceConfig{} + + worker1.ToFree() + c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil) + worker2.ToFree() + + // step 2: check when enable-relay = false, can start/stop relay without worker name + c.Assert(s.StartRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsTrue) + + c.Assert(s.StartRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsTrue) + + c.Assert(s.StopRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse) + + c.Assert(s.StopRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse) + + // step 3: check when enable-relay = false, can start/stop relay with worker name + c.Assert(s.StartRelay(sourceID1, []string{workerName1, workerName2}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse) + c.Assert(worker1.Stage(), Equals, WorkerBound) + c.Assert(worker2.Stage(), Equals, WorkerRelay) + + c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil) + c.Assert(worker1.Stage(), Equals, WorkerBound) + c.Assert(worker2.Stage(), Equals, WorkerRelay) + + c.Assert(s.StopRelay(sourceID1, []string{workerName2}), IsNil) + c.Assert(worker1.Stage(), Equals, WorkerBound) + c.Assert(worker2.Stage(), Equals, WorkerFree) + + // step 4: check when enable-relay = true, can't start/stop relay with worker name + c.Assert(s.StartRelay(sourceID1, []string{}), IsNil) + + err := s.StartRelay(sourceID1, []string{workerName1}) + c.Assert(terror.ErrSchedulerStartRelayOnBound.Equal(err), IsTrue) + err = s.StartRelay(sourceID1, []string{workerName2}) + c.Assert(terror.ErrSchedulerStartRelayOnBound.Equal(err), IsTrue) + + err = s.StopRelay(sourceID1, []string{workerName1}) + c.Assert(terror.ErrSchedulerStopRelayOnBound.Equal(err), IsTrue) + err = s.StopRelay(sourceID1, []string{workerName2}) + c.Assert(terror.ErrSchedulerStopRelayOnBound.Equal(err), IsTrue) + + c.Assert(s.StopRelay(sourceID1, []string{}), IsNil) + + // step5. check when started relay with workerName, can't turn on enable-relay + c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil) + + err = s.StartRelay(sourceID1, []string{}) + c.Assert(terror.ErrSchedulerStartRelayOnSpecified.Equal(err), IsTrue) + err = s.StopRelay(sourceID1, []string{}) + c.Assert(terror.ErrSchedulerStopRelayOnSpecified.Equal(err), IsTrue) +} + func checkAllWorkersClosed(c *C, s *Scheduler, closed bool) { for _, worker := range s.workers { cli, ok := worker.cli.(*workerrpc.GRPCClient) @@ -1711,3 +1787,67 @@ func (t *testScheduler) TestWorkerHasDiffRelayAndBound(c *C) { _, ok = s.unbounds[sourceID1] c.Assert(ok, IsTrue) } + +func (t *testScheduler) TestUpgradeCauseConflictRelayType(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + keepAlive = int64(3) + ) + + workerInfo1 := ha.WorkerInfo{Name: workerName1} + workerInfo2 := ha.WorkerInfo{Name: workerName2} + bound := ha.SourceBound{ + Source: sourceID1, + Worker: workerName1, + } + + sourceCfg, err := config.LoadFromFile("../source.yaml") + c.Assert(err, IsNil) + sourceCfg.Checker.BackoffMax = config.Duration{Duration: 5 * time.Second} + + // prepare etcd data + s.etcdCli = etcdTestCli + sourceCfg.EnableRelay = true + sourceCfg.SourceID = sourceID1 + _, err = ha.PutSourceCfg(etcdTestCli, sourceCfg) + c.Assert(err, IsNil) + _, err = ha.PutRelayConfig(etcdTestCli, sourceID1, workerName1) + c.Assert(err, IsNil) + _, err = ha.PutRelayConfig(etcdTestCli, sourceID1, workerName2) + c.Assert(err, IsNil) + _, err = ha.PutWorkerInfo(etcdTestCli, workerInfo1) + c.Assert(err, IsNil) + _, err = ha.PutWorkerInfo(etcdTestCli, workerInfo2) + c.Assert(err, IsNil) + _, err = ha.PutSourceBound(etcdTestCli, bound) + c.Assert(err, IsNil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + //nolint:errcheck + go ha.KeepAlive(ctx, etcdTestCli, workerName1, keepAlive) + //nolint:errcheck + go ha.KeepAlive(ctx, etcdTestCli, workerName2, keepAlive) + + // bootstrap + c.Assert(s.recoverSources(etcdTestCli), IsNil) + c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil) + _, err = s.recoverWorkersBounds(etcdTestCli) + c.Assert(err, IsNil) + + // check when the relay config is conflicting with source config, relay config should be deleted + c.Assert(s.relayWorkers[sourceID1], HasLen, 0) + result, _, err := ha.GetAllRelayConfig(etcdTestCli) + c.Assert(err, IsNil) + c.Assert(result, HasLen, 0) + + worker := s.workers[workerName1] + c.Assert(worker.Stage(), Equals, WorkerBound) + c.Assert(worker.RelaySourceID(), HasLen, 0) + c.Assert(s.workers[workerName2].Stage(), Equals, WorkerFree) +} diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index abc312d60a6..3681a6699a4 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -672,9 +672,7 @@ type hasWokers interface { GetName() string } -func extractSources(s *Server, req hasWokers) ([]string, error) { - var sources []string - +func extractSources(s *Server, req hasWokers) (sources []string, specifiedSource bool, err error) { switch { case len(req.GetSources()) > 0: sources = req.GetSources() @@ -685,20 +683,21 @@ func extractSources(s *Server, req hasWokers) ([]string, error) { } } if len(invalidSource) > 0 { - return nil, errors.Errorf("sources %s haven't been added", invalidSource) + return nil, false, errors.Errorf("sources %s haven't been added", invalidSource) } + specifiedSource = true case len(req.GetName()) > 0: // query specified task's sources sources = s.getTaskResources(req.GetName()) if len(sources) == 0 { - return nil, errors.Errorf("task %s has no source or not exist", req.GetName()) + return nil, false, errors.Errorf("task %s has no source or not exist", req.GetName()) } default: // query all sources log.L().Info("get sources") sources = s.scheduler.BoundSources() } - return sources, nil + return } // QueryStatus implements MasterServer.QueryStatus. @@ -711,8 +710,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest if shouldRet { return resp2, err2 } - - sources, err := extractSources(s, req) + sources, specifiedSource, err := extractSources(s, req) if err != nil { // nolint:nilerr return &pb.QueryStatusListResponse{ @@ -720,29 +718,29 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest Msg: err.Error(), }, nil } - - queryRelayWorker := false - if len(req.GetSources()) > 0 { - // if user specified sources, query relay workers instead of task workers - queryRelayWorker = true + resps := s.getStatusFromWorkers(ctx, sources, req.Name, specifiedSource) + workerRespMap := make(map[string][]*pb.QueryStatusResponse, len(sources)) // sourceName -> worker QueryStatusResponse + inSlice := func(s []string, e string) bool { + for _, v := range s { + if v == e { + return true + } + } + return false } - - resps := s.getStatusFromWorkers(ctx, sources, req.Name, queryRelayWorker) - workerRespMap := make(map[string][]*pb.QueryStatusResponse, len(sources)) for _, workerResp := range resps { workerRespMap[workerResp.SourceStatus.Source] = append(workerRespMap[workerResp.SourceStatus.Source], workerResp) + // append some offline worker responses + if !inSlice(sources, workerResp.SourceStatus.Source) { + sources = append(sources, workerResp.SourceStatus.Source) + } } - - sort.Strings(sources) - workerResps := make([]*pb.QueryStatusResponse, 0, len(sources)) - for _, worker := range sources { - workerResps = append(workerResps, workerRespMap[worker]...) + workerResps := make([]*pb.QueryStatusResponse, 0) + sort.Strings(sources) // display status sorted by source name + for _, sourceName := range sources { + workerResps = append(workerResps, workerRespMap[sourceName]...) } - resp := &pb.QueryStatusListResponse{ - Result: true, - Sources: workerResps, - } - return resp, nil + return &pb.QueryStatusListResponse{Result: true, Sources: workerResps}, nil } // adjust unsynced field in sync status by looking at DDL locks. @@ -875,10 +873,28 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR var wg sync.WaitGroup for _, source := range req.Sources { - workers, err := s.scheduler.GetRelayWorkers(source) + var ( + workers []*scheduler.Worker + workerNameSet = make(map[string]struct{}) + err error + ) + + workers, err = s.scheduler.GetRelayWorkers(source) if err != nil { return nil, err } + // returned workers is not duplicated + for _, w := range workers { + workerNameSet[w.BaseInfo().Name] = struct{}{} + } + // subtask workers may have been found in relay workers + taskWorker := s.scheduler.GetWorkerBySource(source) + if taskWorker != nil { + if _, ok := workerNameSet[taskWorker.BaseInfo().Name]; !ok { + workers = append(workers, taskWorker) + } + } + if len(workers) == 0 { setWorkerResp(errorCommonWorkerResponse(fmt.Sprintf("relay worker for source %s not found, please `start-relay` first", source), source, "")) continue @@ -972,12 +988,12 @@ func (s *Server) getTaskResources(task string) []string { } // getStatusFromWorkers does RPC request to get status from dm-workers. -func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, taskName string, relayWorker bool) []*pb.QueryStatusResponse { +func (s *Server) getStatusFromWorkers( + ctx context.Context, sources []string, taskName string, specifiedSource bool) []*pb.QueryStatusResponse { workerReq := &workerrpc.Request{ Type: workerrpc.CmdQueryStatus, QueryStatus: &pb.QueryStatusRequest{Name: taskName}, } - var ( workerResps = make([]*pb.QueryStatusResponse, 0, len(sources)) workerRespMu sync.Mutex @@ -1010,7 +1026,8 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas workerNameSet = make(map[string]struct{}) err2 error ) - if relayWorker { + // if user specified sources, query relay workers instead of task workers + if specifiedSource { workers, err2 = s.scheduler.GetRelayWorkers(source) if err2 != nil { handleErr(err2, source, "") @@ -1023,8 +1040,7 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas } // subtask workers may have been found in relay workers - taskWorker := s.scheduler.GetWorkerBySource(source) - if taskWorker != nil { + if taskWorker := s.scheduler.GetWorkerBySource(source); taskWorker != nil { if _, ok := workerNameSet[taskWorker.BaseInfo().Name]; !ok { workers = append(workers, taskWorker) } @@ -1070,6 +1086,66 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas } wg.Wait() s.fillUnsyncedStatus(workerResps) + + // when taskName is empty we need list all task even the worker that handle this task is not running. + // see more here https://github.com/pingcap/ticdc/issues/3348 + if taskName == "" { + // sourceName -> resp + fakeWorkerRespM := make(map[string]*pb.QueryStatusResponse) + existWorkerRespM := make(map[string]*pb.QueryStatusResponse) + for _, resp := range workerResps { + existWorkerRespM[resp.SourceStatus.Source] = resp + } + + findNeedInResp := func(sourceName, taskName string) bool { + if _, ok := existWorkerRespM[sourceName]; ok { + // if we found the source resp, it must have the subtask of this task + return true + } + if fake, ok := fakeWorkerRespM[sourceName]; ok { + for _, status := range fake.SubTaskStatus { + if status.Name == taskName { + return true + } + } + } + return false + } + + setOrAppendFakeResp := func(sourceName, taskName string) { + if _, ok := fakeWorkerRespM[sourceName]; !ok { + fakeWorkerRespM[sourceName] = &pb.QueryStatusResponse{ + Result: false, + SubTaskStatus: []*pb.SubTaskStatus{}, + SourceStatus: &pb.SourceStatus{Source: sourceName, Worker: "source not bound"}, + Msg: fmt.Sprintf("can't find task=%s from dm-worker for source=%s, please use dmctl list-member to check if worker is offline.", taskName, sourceName), + } + } + fakeWorkerRespM[sourceName].SubTaskStatus = append(fakeWorkerRespM[sourceName].SubTaskStatus, &pb.SubTaskStatus{Name: taskName}) + } + + for taskName, sourceM := range s.scheduler.GetSubTaskCfgs() { + // only add use specified source related to this task + if specifiedSource { + for _, needDisplayedSource := range sources { + if _, ok := sourceM[needDisplayedSource]; ok && !findNeedInResp(needDisplayedSource, taskName) { + setOrAppendFakeResp(needDisplayedSource, taskName) + } + } + } else { + // make fake response for every source related to this task + for sourceName := range sourceM { + if !findNeedInResp(sourceName, taskName) { + setOrAppendFakeResp(sourceName, taskName) + } + } + } + } + + for _, fake := range fakeWorkerRespM { + setWorkerResp(fake) + } + } return workerResps } diff --git a/dm/dm/worker/server.go b/dm/dm/worker/server.go index 0b4f0237506..69853b0d890 100644 --- a/dm/dm/worker/server.go +++ b/dm/dm/worker/server.go @@ -360,7 +360,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { // we check if observeSourceBound has started a worker // TODO: add a test for this situation if !w.relayEnabled.Load() { - if err2 := w.EnableRelay(); err2 != nil { + if err2 := w.EnableRelay(false); err2 != nil { return err2 } } @@ -679,12 +679,15 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b } if sourceCfg.EnableRelay { - w.startedRelayBySourceCfg = true - if err2 := w.EnableRelay(); err2 != nil { + log.L().Info("will start relay by `enable-relay` in source config") + if err2 := w.EnableRelay(true); err2 != nil { log.L().Error("found a `enable-relay: true` source, but failed to enable relay for DM worker", zap.Error(err2)) return err2 } + } else if w.startedRelayBySourceCfg { + log.L().Info("will disable relay by `enable-relay: false` in source config") + w.DisableRelay() } if err2 := w.EnableHandleSubtasks(); err2 != nil { @@ -748,7 +751,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro // because no re-assigned mechanism exists for keepalived DM-worker yet. return err2 } - if err2 = w.EnableRelay(); err2 != nil { + if err2 = w.EnableRelay(false); err2 != nil { s.setSourceStatus(sourceCfg.SourceID, err2, false) return err2 } diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 67f1a51eac9..1417353892f 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -267,7 +267,19 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context) error { } // EnableRelay enables the functionality of start/watch/handle relay. -func (w *SourceWorker) EnableRelay() (err error) { +// According to relay schedule of DM-master, a source worker will enable relay in two scenarios: its bound source has +// `enable-relay: true` in config, or it has a UpstreamRelayWorkerKeyAdapter etcd KV. +// The paths to EnableRelay are: +// - source config `enable-relay: true`, which is checked in enableHandleSubtasks +// - when DM-worker Server.Start +// - when DM-worker Server watches a SourceBound change, which is to turn a free source worker to bound or notify a +// bound worker that source config has changed +// - when DM-worker Server fails watching and recovers from a snapshot +// - UpstreamRelayWorkerKeyAdapter +// - when DM-worker Server.Start +// - when DM-worker Server watches a UpstreamRelayWorkerKeyAdapter change +// - when DM-worker Server fails watching and recovers from a snapshot +func (w *SourceWorker) EnableRelay(startBySourceCfg bool) (err error) { w.l.Info("enter EnableRelay") w.Lock() defer w.Unlock() @@ -276,6 +288,8 @@ func (w *SourceWorker) EnableRelay() (err error) { return nil } + w.startedRelayBySourceCfg = startBySourceCfg + var sourceCfg *config.SourceConfig failpoint.Inject("MockGetSourceCfgFromETCD", func(_ failpoint.Value) { failpoint.Goto("bypass") @@ -364,10 +378,21 @@ func (w *SourceWorker) EnableRelay() (err error) { } // DisableRelay disables the functionality of start/watch/handle relay. +// a source worker will disable relay by the reason of EnableRelay is no longer valid. +// The paths to DisableRelay are: +// - source config `enable-relay: true` no longer valid +// - when DM-worker Server watches a SourceBound change, which is to notify that source config has changed, and the +// worker has started relay by that bound +// - when the source worker is unbound and has started relay by that bound +// - UpstreamRelayWorkerKeyAdapter no longer valid +// - when DM-worker Server watches a UpstreamRelayWorkerKeyAdapter change +// - when DM-worker Server fails watching and recovers from a snapshot func (w *SourceWorker) DisableRelay() { w.l.Info("enter DisableRelay") w.Lock() defer w.Unlock() + + w.startedRelayBySourceCfg = false if !w.relayEnabled.CAS(true, false) { w.l.Warn("already disabled relay") return diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index 5934abc5291..e68ef583c36 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -81,7 +81,7 @@ func (t *testServer) testWorker(c *C) { }() w, err := NewSourceWorker(cfg, etcdCli, "") c.Assert(err, IsNil) - c.Assert(w.EnableRelay(), ErrorMatches, "init error") + c.Assert(w.EnableRelay(false), ErrorMatches, "init error") NewRelayHolder = NewDummyRelayHolder w, err = NewSourceWorker(cfg, etcdCli, "") @@ -187,7 +187,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { w, err2 := s.getOrStartWorker(sourceConfig, true) c.Assert(err2, IsNil) // we set sourceConfig.EnableRelay = true above - c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableRelay(false), IsNil) c.Assert(w.EnableHandleSubtasks(), IsNil) return true }), IsTrue) @@ -363,7 +363,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { func (t *testWorkerFunctionalities) testEnableRelay(c *C, w *SourceWorker, etcdCli *clientv3.Client, sourceCfg *config.SourceConfig, cfg *Config) { - c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableRelay(false), IsNil) c.Assert(w.relayEnabled.Load(), IsTrue) c.Assert(w.relayHolder.Stage(), Equals, pb.Stage_New) @@ -588,7 +588,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { defer cancel() defer w.Close() go func() { - c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableRelay(false), IsNil) w.Start() }() c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { diff --git a/dm/errors.toml b/dm/errors.toml index 9fd7af01ece..f4b94d419e8 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -2197,7 +2197,7 @@ tags = ["internal", "high"] [error.DM-dm-master-38029] message = "" description = "" -workaround = "" +workaround = "Please use list-member command to see if the some workers are offline." tags = ["internal", "high"] [error.DM-dm-master-38030] @@ -3166,6 +3166,30 @@ description = "" workaround = "If you intend to bind the source with worker, you can stop-relay for current source." tags = ["internal", "medium"] +[error.DM-scheduler-46028] +message = "the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now" +description = "" +workaround = "Please stop all relay workers first, or specify worker name for `start-relay`." +tags = ["internal", "low"] + +[error.DM-scheduler-46029] +message = "the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now" +description = "" +workaround = "Please specify worker names for `stop-relay`." +tags = ["internal", "low"] + +[error.DM-scheduler-46030] +message = "the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now" +description = "" +workaround = "Please stop relay by `stop-relay` without worker name first." +tags = ["internal", "low"] + +[error.DM-scheduler-46031] +message = "the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now" +description = "" +workaround = "Please use `stop-relay` without worker name." +tags = ["internal", "low"] + [error.DM-dmctl-48001] message = "can not create grpc connection" description = "" diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 347192f5719..fe605065b51 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -643,6 +643,10 @@ const ( codeSchedulerSourceCfgUpdate codeSchedulerWrongWorkerInput codeSchedulerCantTransferToRelayWorker + codeSchedulerStartRelayOnSpecified + codeSchedulerStopRelayOnSpecified + codeSchedulerStartRelayOnBound + codeSchedulerStopRelayOnBound ) // dmctl error code. @@ -1097,7 +1101,7 @@ var ( ErrMasterWorkerExistDDLLock = New(codeMasterWorkerExistDDLLock, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s exist ddl lock", "Please unlock ddl lock first.") ErrMasterGetWorkerCfgExtractor = New(codeMasterGetWorkerCfgExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "", "") ErrMasterTaskConfigExtractor = New(codeMasterTaskConfigExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "", "") - ErrMasterWorkerArgsExtractor = New(codeMasterWorkerArgsExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "", "") + ErrMasterWorkerArgsExtractor = New(codeMasterWorkerArgsExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "", "Please use list-member command to see if the some workers are offline.") ErrMasterQueryWorkerConfig = New(codeMasterQueryWorkerConfig, ClassDMMaster, ScopeInternal, LevelHigh, "", "") ErrMasterOperNotFound = New(codeMasterOperNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "operation %d of task %s on worker %s not found", "Please execute `query-status` to check status.") ErrMasterOperRespNotSuccess = New(codeMasterOperRespNotSuccess, ClassDMMaster, ScopeInternal, LevelHigh, "some error occurs in dm-worker: %s", "Please execute `query-status` to check status.") @@ -1285,6 +1289,10 @@ var ( ErrSchedulerSourceCfgUpdate = New(codeSchedulerSourceCfgUpdate, ClassScheduler, ScopeInternal, LevelLow, "source can only update relay-log related parts for now", "") ErrSchedulerWrongWorkerInput = New(codeSchedulerWrongWorkerInput, ClassScheduler, ScopeInternal, LevelMedium, "require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]", "") ErrSchedulerBoundDiffWithStartedRelay = New(codeSchedulerCantTransferToRelayWorker, ClassScheduler, ScopeInternal, LevelMedium, "require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s]", "If you intend to bind the source with worker, you can stop-relay for current source.") + ErrSchedulerStartRelayOnSpecified = New(codeSchedulerStartRelayOnSpecified, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now", "Please stop all relay workers first, or specify worker name for `start-relay`.") + ErrSchedulerStopRelayOnSpecified = New(codeSchedulerStopRelayOnSpecified, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now", "Please specify worker names for `stop-relay`.") + ErrSchedulerStartRelayOnBound = New(codeSchedulerStartRelayOnBound, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now", "Please stop relay by `stop-relay` without worker name first.") + ErrSchedulerStopRelayOnBound = New(codeSchedulerStopRelayOnBound, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now", "Please use `stop-relay` without worker name.") // dmctl. ErrCtlGRPCCreateConn = New(codeCtlGRPCCreateConn, ClassDMCtl, ScopeInternal, LevelHigh, "can not create grpc connection", "Please check your network connection.") diff --git a/dm/pkg/utils/util.go b/dm/pkg/utils/util.go index d6975f668a0..2f9f072edd6 100644 --- a/dm/pkg/utils/util.go +++ b/dm/pkg/utils/util.go @@ -264,6 +264,15 @@ func proxyFields() []zap.Field { return fields } +// SetToSlice converts a map of struct{} value to a slice to pretty print. +func SetToSlice(set map[string]struct{}) []string { + slice := make([]string, 0, len(set)) + for key := range set { + slice = append(slice, key) + } + return slice +} + func NewStoppedTimer() *time.Timer { // stopped timer should be Reset with correct duration, so use 0 here t := time.NewTimer(0) diff --git a/dm/tests/adjust_gtid/run.sh b/dm/tests/adjust_gtid/run.sh index a06dc92fa70..1cca51fe0e3 100755 --- a/dm/tests/adjust_gtid/run.sh +++ b/dm/tests/adjust_gtid/run.sh @@ -70,10 +70,6 @@ function run() { # make sure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/dm/tests/all_mode/run.sh b/dm/tests/all_mode/run.sh index 0d127e841a3..ce27dce043c 100755 --- a/dm/tests/all_mode/run.sh +++ b/dm/tests/all_mode/run.sh @@ -362,10 +362,6 @@ function run() { # make sure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/dm/tests/dmctl_basic/check_list/query_status.sh b/dm/tests/dmctl_basic/check_list/query_status.sh index c91c4fa2ba2..8800bfb45e1 100644 --- a/dm/tests/dmctl_basic/check_list/query_status.sh +++ b/dm/tests/dmctl_basic/check_list/query_status.sh @@ -56,3 +56,10 @@ function query_status_running_tasks() { "\"result\": true" 3 \ "\"stage\": \"Running\"" 4 } + +function query_status_with_offline_worker() { + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status" \ + "\"result\": false" 2 \ + "\"worker\": \"source not bound\"" 2 +} diff --git a/dm/tests/dmctl_basic/check_list/start_relay.sh b/dm/tests/dmctl_basic/check_list/start_relay.sh index 5af13ffb787..323f2a79fc6 100644 --- a/dm/tests/dmctl_basic/check_list/start_relay.sh +++ b/dm/tests/dmctl_basic/check_list/start_relay.sh @@ -12,12 +12,6 @@ function start_relay_wrong_arg() { "must specify one source (\`-s\` \/ \`--source\`)" 1 } -function start_relay_without_worker() { - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1" \ - "must specify at least one worker" 1 -} - function start_relay_success() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-relay -s $SOURCE_ID1 worker1" \ @@ -32,12 +26,30 @@ function start_relay_success() { "\"worker\": \"worker2\"" 1 } -function start_relay_fail() { +function start_relay_without_worker_name_success() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 +} + +function start_relay_diff_worker_fail() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-relay -s $SOURCE_ID1 worker2" \ "these workers \[worker2\] have bound for another sources \[$SOURCE_ID2\] respectively" 1 } +function start_relay_with_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "can't \`start-relay\` with worker name now" 1 +} + +function start_relay_without_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1" \ + "can't \`start-relay\` without worker name now" 1 +} + function start_relay_on_offline_worker() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-relay -s $SOURCE_ID2 worker2" \ diff --git a/dm/tests/dmctl_basic/check_list/stop_relay.sh b/dm/tests/dmctl_basic/check_list/stop_relay.sh index b5e6904c93d..765efa2eccf 100644 --- a/dm/tests/dmctl_basic/check_list/stop_relay.sh +++ b/dm/tests/dmctl_basic/check_list/stop_relay.sh @@ -12,12 +12,6 @@ function stop_relay_wrong_arg() { "must specify one source (\`-s\` \/ \`--source\`)" 1 } -function stop_relay_without_worker() { - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "stop-relay -s $SOURCE_ID1" \ - "must specify at least one worker" 1 -} - function stop_relay_success() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-relay -s $SOURCE_ID1 worker1" \ @@ -27,12 +21,30 @@ function stop_relay_success() { "\"result\": true" 2 } +function stop_relay_with_worker_name_success() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 +} + function stop_relay_fail() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-relay -s $SOURCE_ID1 worker2" \ "these workers \[worker2\] have started relay for another sources \[$SOURCE_ID2\] respectively" 1 } +function stop_relay_with_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1 worker1" \ + "can't \`stop-relay\` with worker name now" 1 +} + +function stop_relay_without_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "can't \`stop-relay\` without worker name now" 1 +} + function stop_relay_on_offline_worker() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-relay -s $SOURCE_ID2 worker2" \ diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 02af24df688..72f722178e2 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -76,12 +76,10 @@ function usage_and_arg_test() { echo "start_relay_empty_arg" start_relay_empty_arg start_relay_wrong_arg - start_relay_without_worker echo "stop_relay_empty_arg" stop_relay_empty_arg stop_relay_wrong_arg - stop_relay_without_worker } function recover_max_binlog_size() { @@ -267,7 +265,9 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT start_relay_success - start_relay_fail + start_relay_diff_worker_fail + start_relay_without_worker_name_fail + stop_relay_without_worker_name_fail echo "pause_relay_success" pause_relay_success @@ -308,6 +308,21 @@ function run() { stop_relay_fail stop_relay_success + # stop worker to test query-status works well when no worker + ps aux | grep dmctl_basic/worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + ps aux | grep dmctl_basic/worker2 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER2_PORT 20 + + query_status_with_offline_worker + + # start worker again + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $dm_worker1_conf + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $dm_worker2_conf + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + echo "config" config_wrong_arg config_to_file @@ -337,6 +352,11 @@ function run() { # update_task_success_single_worker $TASK_CONF $SOURCE_ID1 # update_task_success $TASK_CONF + start_relay_without_worker_name_success + start_relay_with_worker_name_fail + stop_relay_with_worker_name_fail + stop_relay_with_worker_name_success + start_relay_success run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -430,8 +450,8 @@ function run() { cleanup_data dmctl # also cleanup dm processes in case of last run failed -cleanup_process $* -run $* -cleanup_process $* +cleanup_process +run +cleanup_process echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/dm/tests/duplicate_event/conf/source2.yaml b/dm/tests/duplicate_event/conf/source2.yaml index 538e8942a40..621e82bfce5 100644 --- a/dm/tests/duplicate_event/conf/source2.yaml +++ b/dm/tests/duplicate_event/conf/source2.yaml @@ -2,7 +2,7 @@ source-id: mysql-replica-02 server-id: 123456 flavor: 'mysql' enable-gtid: true -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/dm/tests/full_mode/run.sh b/dm/tests/full_mode/run.sh index 87e4f5a4b2b..aac7afef52e 100755 --- a/dm/tests/full_mode/run.sh +++ b/dm/tests/full_mode/run.sh @@ -37,10 +37,6 @@ function fail_acquire_global_lock() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 2 - cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml sed -i '/heartbeat-report-interval/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/dm/tests/gtid/run.sh b/dm/tests/gtid/run.sh index 44824a6878b..adca64f0320 100755 --- a/dm/tests/gtid/run.sh +++ b/dm/tests/gtid/run.sh @@ -24,9 +24,6 @@ function run() { run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT diff --git a/dm/tests/ha/conf/source2.yaml b/dm/tests/ha/conf/source2.yaml index d6f08468313..fb1985ca354 100644 --- a/dm/tests/ha/conf/source2.yaml +++ b/dm/tests/ha/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/dm/tests/ha_cases/run.sh b/dm/tests/ha_cases/run.sh index f1b180332c9..5195f50f774 100755 --- a/dm/tests/ha_cases/run.sh +++ b/dm/tests/ha_cases/run.sh @@ -138,7 +138,8 @@ function test_standalone_running() { # test running, test2 fail run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status" \ - "\"taskStatus\": \"Running\"" 1 + "\"stage\": \"Running\"" 1 \ + "\"worker\": \"source not bound\"" 1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test2" \ diff --git a/dm/tests/incremental_mode/conf/source1.yaml b/dm/tests/incremental_mode/conf/source1.yaml index 406e7ae2547..679a2f4db7c 100644 --- a/dm/tests/incremental_mode/conf/source1.yaml +++ b/dm/tests/incremental_mode/conf/source1.yaml @@ -3,7 +3,7 @@ flavor: 'mysql' enable-gtid: true relay-binlog-name: '' relay-binlog-gtid: '' -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/dm/tests/incremental_mode/conf/source2.yaml b/dm/tests/incremental_mode/conf/source2.yaml index e21304db8e3..21294f2c27c 100644 --- a/dm/tests/incremental_mode/conf/source2.yaml +++ b/dm/tests/incremental_mode/conf/source2.yaml @@ -3,11 +3,11 @@ flavor: '' enable-gtid: false relay-binlog-name: '' relay-binlog-gtid: '' -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3307 checker: - check-enable: false \ No newline at end of file + check-enable: false diff --git a/dm/tests/initial_unit/run.sh b/dm/tests/initial_unit/run.sh index 3f479d08958..fbbae7de5ad 100644 --- a/dm/tests/initial_unit/run.sh +++ b/dm/tests/initial_unit/run.sh @@ -43,10 +43,6 @@ function run() { sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - echo "start task and query status, the sync unit will initial failed" task_conf="$cur/conf/dm-task.yaml" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index c5a7ad43ae7..b2c9263d266 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -37,10 +37,6 @@ function run() { # make sure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/dm/tests/online_ddl/run.sh b/dm/tests/online_ddl/run.sh index e487f91538d..cca736349ae 100755 --- a/dm/tests/online_ddl/run.sh +++ b/dm/tests/online_ddl/run.sh @@ -40,10 +40,6 @@ function run() { dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 2 - # imitate a DM task is started during the running of online DDL tool # *_ignore will be skipped by block-allow-list run_sql_source1 "create table online_ddl.gho_ignore (c int); create table online_ddl._gho_ignore_gho (c int);" @@ -116,10 +112,6 @@ function run() { run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker3" \ - "\"result\": true" 2 - echo "wait and check task running" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ diff --git a/dm/tests/only_dml/run.sh b/dm/tests/only_dml/run.sh index fb0c192ac0e..2de2f17ee00 100755 --- a/dm/tests/only_dml/run.sh +++ b/dm/tests/only_dml/run.sh @@ -69,14 +69,6 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - # start relay - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 2 - # check dm-workers metrics unit: relay file index must be 1. check_metric $WORKER1_PORT "dm_relay_binlog_file" 3 0 2 check_metric $WORKER2_PORT "dm_relay_binlog_file" 3 0 2 diff --git a/dm/tests/relay_interrupt/run.sh b/dm/tests/relay_interrupt/run.sh index 8763e01920a..7f6e97dcd5a 100644 --- a/dm/tests/relay_interrupt/run.sh +++ b/dm/tests/relay_interrupt/run.sh @@ -47,11 +47,6 @@ function run() { sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 1 \ - "ERROR 1152" 1 - echo "query status, relay log failed" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ diff --git a/dm/tests/tiup/lib.sh b/dm/tests/tiup/lib.sh index 318b8be2724..648df2791f2 100755 --- a/dm/tests/tiup/lib.sh +++ b/dm/tests/tiup/lib.sh @@ -265,14 +265,7 @@ function run_dmctl_with_retry() { function ensure_start_relay() { # manually enable relay for source1 after v2.0.2 if [[ "$PRE_VER" != "v2.0.0" ]] && [[ "$PRE_VER" != "v2.0.1" ]]; then - dmctl_log="get-worker.txt" # always use CUR_VER, because we might use tiup mirror in previous steps. - tiup dmctl:$CUR_VER --master-addr=master1:8261 operate-source show -s mysql-replica-01 >$dmctl_log 2>&1 - worker=$(grep "worker" $dmctl_log | awk -F'"' '{ print $4 }') - if [[ "$PRE_VER" == "v2.0.2" ]] || [[ "$PRE_VER" == "v2.0.3" ]] || [[ "$PRE_VER" == "v2.0.4" ]] || [[ "$PRE_VER" == "v2.0.5" ]] || [[ "$PRE_VER" == "v2.0.6" ]] || [[ "$PRE_VER" == "v2.0.7" ]]; then - run_dmctl_with_retry $CUR_VER "start-relay -s mysql-replica-01 $worker" "\"result\": true" 1 - else - run_dmctl_with_retry $CUR_VER "start-relay -s mysql-replica-01 $worker" "\"result\": true" 2 - fi + run_dmctl_with_retry $CUR_VER "start-relay -s mysql-replica-01" "\"result\": true" 2 fi } diff --git a/dm/tests/tracker_ignored_ddl/run.sh b/dm/tests/tracker_ignored_ddl/run.sh index 57368dd23c1..670eadd4337 100644 --- a/dm/tests/tracker_ignored_ddl/run.sh +++ b/dm/tests/tracker_ignored_ddl/run.sh @@ -47,6 +47,14 @@ function run() { run_sql_tidb "select count(1) from $TEST_NAME.t1;" check_contains "count(1): 2" echo "increment2 check success" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 1 } cleanup_data $TEST_NAME diff --git a/pkg/context/context.go b/pkg/context/context.go index d4eb49ae46d..7a8e9020467 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/ticdc/pkg/version" tidbkv "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -40,6 +41,7 @@ type GlobalVars struct { CaptureInfo *model.CaptureInfo EtcdClient *etcd.CDCEtcdClient GrpcPool kv.GrpcPool + RegionCache *tikv.RegionCache TimeAcquirer pdtime.TimeAcquirer TableActorSystem *system.System } diff --git a/tools/check/go.sum b/tools/check/go.sum index 5e57e018ef0..3a9006fdcc7 100644 --- a/tools/check/go.sum +++ b/tools/check/go.sum @@ -662,6 +662,7 @@ github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuM github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3 h1:8l9lu9RjWkI/VeqrP+Fn3tvZNPu5GYP0rYLLN5Q46go= github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd h1:I8IeI8MNiZVKnwuXhcIIzz6pREcOSbq18Q31KYIzFVM= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd/go.mod h1:IVF+ijPSMZVtx2oIqxAg7ur6EyixtTYfOHwpfmlhqI4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgcC4zIxODThtZNPirFr42+A= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -736,6 +737,7 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/securego/gosec/v2 v2.8.1 h1:Tyy/nsH39TYCOkqf5HAgRE+7B5D8sHDwPdXRgFWokh8= github.com/securego/gosec/v2 v2.8.1/go.mod h1:pUmsq6+VyFEElJMUX+QB3p3LWNHXg1R3xh2ssVJPs8Q= github.com/securego/gosec/v2 v2.9.1/go.mod h1:oDcDLcatOJxkCGaCaq8lua1jTnYf6Sou4wdiJ1n4iHc= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c h1:W65qqJCIOVP4jpqPQ0YvHYKwcMEMVWIzWC5iNQQfBTU= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs=