diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index b90b0dc6e74..a21b79c1f19 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -42,9 +42,6 @@ func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) { m.cleanUpInfos() } }() - if m.state.Status == nil { - return - } if m.handleAdminJob() { // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index a0c14359d18..61c8f2f2eac 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -41,6 +41,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { c.Assert(status, check.IsNil) return &model.ChangeFeedStatus{}, true, nil }) + tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() c.Assert(manager.ShouldRunning(), check.IsTrue) @@ -138,6 +139,7 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { c.Assert(status, check.IsNil) return &model.ChangeFeedStatus{}, true, nil }) + tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() c.Assert(manager.ShouldRunning(), check.IsTrue) @@ -246,3 +248,29 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop) c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop) } + +func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + manager := new(feedStateManager) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(c, state, map[string]string{ + "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, + "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`, + "/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", + }) + manager.Tick(state) + c.Assert(manager.ShouldRunning(), check.IsFalse) + tester.MustApplyPatches() + + manager.PushAdminJob(&model.AdminJob{ + CfID: ctx.ChangefeedVars().ID, + Type: model.AdminRemove, + Opts: &model.AdminJobOption{ForceRemove: true}, + }) + manager.Tick(state) + c.Assert(manager.ShouldRunning(), check.IsFalse) + tester.MustApplyPatches() + c.Assert(state.Info, check.IsNil) + c.Assert(state.Exist(), check.IsFalse) +}