Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd/client (ticdc): Prevent revision in WatchWitchChan fallback. (#3851) #3926

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (

func Test(t *testing.T) { check.TestingT(t) }

type codecSuite struct {
}
type codecSuite struct{}

var _ = check.Suite(&codecSuite{})

Expand All @@ -43,8 +42,7 @@ func (s *codecSuite) TestDecodeRecordKey(c *check.C) {
c.Assert(len(key), check.Equals, 0)
}

type decodeMetaKeySuite struct {
}
type decodeMetaKeySuite struct{}

var _ = check.Suite(&decodeMetaKeySuite{})

Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func testMounterDisableOldValue(c *check.C, tc struct {
tableName string
createTableDDL string
values [][]interface{}
}) {
},
) {
store, err := mockstore.NewMockStore()
c.Assert(err, check.IsNil)
defer store.Close() //nolint:errcheck
Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ func Test(t *testing.T) {
check.TestingT(t)
}

type clientSuite struct {
}
type clientSuite struct{}

var _ = check.Suite(&clientSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type rtsHeapSuite struct {
}
type rtsHeapSuite struct{}

var _ = check.Suite(&rtsHeapSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/token_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (
"golang.org/x/sync/errgroup"
)

type tokenRegionSuite struct {
}
type tokenRegionSuite struct{}

var _ = check.Suite(&tokenRegionSuite{})

Expand Down
1 change: 0 additions & 1 deletion cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ var _ = check.Suite(&taskStatusSuite{})
func (s *taskStatusSuite) TestShouldBeDeepCopy(c *check.C) {
defer testleak.AfterTest(c)()
info := TaskStatus{

Tables: map[TableID]*TableReplicaInfo{
1: {StartTs: 100},
2: {StartTs: 100},
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func Test(t *testing.T) { check.TestingT(t) }

var _ = check.Suite(&barrierSuite{})

type barrierSuite struct {
}
type barrierSuite struct{}

func (s *barrierSuite) TestBarrier(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {

var _ = check.Suite(&changefeedSuite{})

type changefeedSuite struct {
}
type changefeedSuite struct{}

func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) {
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester,
) {
ctx.GlobalVars().PDClient = &gc.MockPDClient{
UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (

var _ = check.Suite(&ddlPullerSuite{})

type ddlPullerSuite struct {
}
type ddlPullerSuite struct{}

type mockPuller struct {
c *check.C
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

var _ = check.Suite(&feedStateManagerSuite{})

type feedStateManagerSuite struct {
}
type feedStateManagerSuite struct{}

func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
59 changes: 27 additions & 32 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (

var _ = check.Suite(&ownerSuite{})

type ownerSuite struct {
}
type ownerSuite struct{}

type mockManager struct {
gc.Manager
Expand Down Expand Up @@ -328,22 +327,20 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
tester := orchestrator.NewReactorStateTester(c, state, nil)

// no changefeed, the gc safe point should be max uint64
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1))
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1))
return 0, nil
}
err := o.updateGCSafepoint(ctx, state)
c.Assert(err, check.IsNil)

// add a failed changefeed, it must not trigger update GC safepoint.
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.Fatal("must not update")
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.Fatal("must not update")
return 0, nil
}
changefeedID1 := "changefeed-test1"
tester.MustUpdate(
fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1),
Expand All @@ -360,15 +357,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
// switch the state of changefeed to normal, it must update GC safepoint to
// 1 (checkpoint Ts of changefeed-test1).
ch := make(chan struct{}, 1)
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(1))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(1))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
state.Changefeeds[changefeedID1].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.State = model.StateNormal
Expand Down Expand Up @@ -398,15 +394,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
return &model.ChangeFeedStatus{CheckpointTs: 30}, true, nil
})
tester.MustApplyPatches()
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(19))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(19))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
err = o.updateGCSafepoint(ctx, state)
c.Assert(err, check.IsNil)
select {
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import (

var _ = check.Suite(&schemaSuite{})

type schemaSuite struct {
}
type schemaSuite struct{}

func (s *schemaSuite) TestAllPhysicalTables(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type pullerNode struct {
}

func newPullerNode(
tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node {
tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string,
) pipeline.Node {
return &pullerNode{
tableID: tableID,
replicaInfo: replicaInfo,
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
func (s *mockSink) Check(c *check.C, expected []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}) {
},
) {
c.Assert(s.received, check.DeepEquals, expected)
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func NewTablePipeline(ctx cdcContext.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
targetTs model.Ts) TablePipeline {
targetTs model.Ts,
) TablePipeline {
ctx, cancel := cdcContext.WithCancel(ctx)
tablePipeline := &tablePipelineImpl{
tableID: tableID,
Expand Down
3 changes: 1 addition & 2 deletions cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import (
pd "github.com/tikv/pd/client"
)

type pullerSuite struct {
}
type pullerSuite struct{}

var _ = check.Suite(&pullerSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/puller/sorter/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
)

type msgPackGenSerde struct {
}
type msgPackGenSerde struct{}

func (m *msgPackGenSerde) marshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) {
bytes = bytes[:0]
Expand Down
6 changes: 3 additions & 3 deletions cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type metricsInfo struct {
captureAddr string
}

type ctxKey struct {
}
type ctxKey struct{}

// UnifiedSorterCheckDir checks whether the directory needed exists and is writable.
// If it does not exist, we try to create one.
Expand Down Expand Up @@ -89,7 +88,8 @@ func NewUnifiedSorter(
changeFeedID model.ChangeFeedID,
tableName string,
tableID model.TableID,
captureAddr string) (*UnifiedSorter, error) {
captureAddr string,
) (*UnifiedSorter, error) {
poolMu.Lock()
defer poolMu.Unlock()

Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"golang.org/x/text/encoding/charmap"
)

type canalFlatSuite struct {
}
type canalFlatSuite struct{}

var _ = check.Suite(&canalFlatSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type codecInterfaceSuite struct {
}
type codecInterfaceSuite struct{}

var _ = check.Suite(&codecInterfaceSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type AvroSchemaRegistrySuite struct {
}
type AvroSchemaRegistrySuite struct{}

var _ = check.Suite(&AvroSchemaRegistrySuite{})

Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
filter *filter.Filter, replicaConfig *config.ReplicaConfig,
opts map[string]string, errCh chan error) (*mqSink, error) {
opts map[string]string, errCh chan error,
) (*mqSink, error) {
producerConfig := kafka.NewConfig()
if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
Expand All @@ -442,7 +443,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
}

func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (*mqSink, error) {
producer, err := pulsar.NewProducer(sinkURI, errCh)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
func init() {
failpoint.Inject("SimpleMySQLSinkTester", func() {
sinkIniterMap["simple-mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newSimpleMySQLSink(ctx, sinkURI, config)
}
})
Expand Down
18 changes: 12 additions & 6 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *filter.Fi
func init() {
// register blackhole sink
sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newBlackHoleSink(ctx), nil
}

// register mysql sink
sinkIniterMap["mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts)
}
sinkIniterMap["tidb"] = sinkIniterMap["mysql"]
Expand All @@ -80,27 +82,31 @@ func init() {

// register kafka sink
sinkIniterMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh)
}
sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"]

// register pulsar sink
sinkIniterMap["pulsar"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh)
}
sinkIniterMap["pulsar+ssl"] = sinkIniterMap["pulsar"]

// register local sink
sinkIniterMap["local"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return cdclog.NewLocalFileSink(ctx, sinkURI, errCh)
}

// register s3 sink
sinkIniterMap["s3"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return cdclog.NewS3Sink(ctx, sinkURI, errCh)
}
}
Expand Down
Loading