Skip to content

Commit

Permalink
owner: fix a changefeed can not removed when the changefeed status is…
Browse files Browse the repository at this point in the history
… not exist (pingcap#1952)
  • Loading branch information
leoppro authored and leoppro committed Jun 22, 2021
1 parent cd1bce5 commit 581b86d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
3 changes: 0 additions & 3 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) {
m.cleanUpInfos()
}
}()
if m.state.Status == nil {
return
}
if m.handleAdminJob() {
// `handleAdminJob` returns true means that some admin jobs are pending
// skip to the next tick until all the admin jobs is handled
Expand Down
28 changes: 28 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
c.Assert(status, check.IsNil)
return &model.ChangeFeedStatus{}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsTrue)
Expand Down Expand Up @@ -138,6 +139,7 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) {
c.Assert(status, check.IsNil)
return &model.ChangeFeedStatus{}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsTrue)
Expand Down Expand Up @@ -246,3 +248,29 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) {
c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop)
c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop)
}

func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, map[string]string{
"/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
})
manager.Tick(state)
c.Assert(manager.ShouldRunning(), check.IsFalse)
tester.MustApplyPatches()

manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminRemove,
Opts: &model.AdminJobOption{ForceRemove: true},
})
manager.Tick(state)
c.Assert(manager.ShouldRunning(), check.IsFalse)
tester.MustApplyPatches()
c.Assert(state.Info, check.IsNil)
c.Assert(state.Exist(), check.IsFalse)
}

0 comments on commit 581b86d

Please sign in to comment.