diff --git a/cdc/model/reactor_state.go b/cdc/model/reactor_state.go index eb7dc661f0e..3dc6c703e0b 100644 --- a/cdc/model/reactor_state.go +++ b/cdc/model/reactor_state.go @@ -262,7 +262,7 @@ func (s *ChangefeedReactorState) GetPatches() []orchestrator.DataPatch { func (s *ChangefeedReactorState) CheckChangefeedNormal() { s.skipPatchesInThisTick = false s.PatchInfo(func(info *ChangeFeedInfo) (*ChangeFeedInfo, bool, error) { - if info.AdminJobType.IsStopState() { + if info == nil || info.AdminJobType.IsStopState() { s.skipPatchesInThisTick = true return info, false, cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } diff --git a/cdc/model/reactor_state_test.go b/cdc/model/reactor_state_test.go index a8bda8c07b2..9ce7540e236 100644 --- a/cdc/model/reactor_state_test.go +++ b/cdc/model/reactor_state_test.go @@ -652,6 +652,8 @@ func (s *stateSuite) TestCheckChangefeedNormal(c *check.C) { defer testleak.AfterTest(c)() state := NewChangefeedReactorState("test1") stateTester := orchestrator.NewReactorStateTester(c, state, nil) + state.CheckChangefeedNormal() + stateTester.MustApplyPatches() state.PatchInfo(func(info *ChangeFeedInfo) (*ChangeFeedInfo, bool, error) { return &ChangeFeedInfo{SinkURI: "123", AdminJobType: AdminNone, Config: &config.ReplicaConfig{}}, true, nil })