diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index c29e199221b..de1494ebbef 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -103,16 +103,6 @@ func newProcessor(ctx cdcContext.Context) *processor { return p } -func newProcessor4Test(ctx cdcContext.Context, - createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error), -) *processor { - p := newProcessor(ctx) - p.lazyInit = func(ctx cdcContext.Context) error { return nil } - p.createTablePipeline = createTablePipeline - p.sinkManager = &sink.Manager{} - return p -} - // Tick implements the `orchestrator.State` interface // the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd // The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc. diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 0b2d17681b7..7646625f3a1 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline" + "github.com/pingcap/tiflow/cdc/sink" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" @@ -49,6 +50,7 @@ func newProcessor4Test( p.lazyInit = func(ctx cdcContext.Context) error { return nil } p.createTablePipeline = createTablePipeline p.schemaStorage = &mockSchemaStorage{c: c} + p.sinkManager = &sink.Manager{} return p }