diff --git a/cdc/entry/codec_test.go b/cdc/entry/codec_test.go index 4b8ceffd4f7..eb04638b218 100644 --- a/cdc/entry/codec_test.go +++ b/cdc/entry/codec_test.go @@ -25,8 +25,7 @@ import ( func Test(t *testing.T) { check.TestingT(t) } -type codecSuite struct { -} +type codecSuite struct{} var _ = check.Suite(&codecSuite{}) @@ -43,8 +42,7 @@ func (s *codecSuite) TestDecodeRecordKey(c *check.C) { c.Assert(len(key), check.Equals, 0) } -type decodeMetaKeySuite struct { -} +type decodeMetaKeySuite struct{} var _ = check.Suite(&decodeMetaKeySuite{}) diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 7fc9d74464f..07b215dc79b 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -216,7 +216,8 @@ func testMounterDisableOldValue(c *check.C, tc struct { tableName string createTableDDL string values [][]interface{} -}) { +}, +) { store, err := mockstore.NewMockStore() c.Assert(err, check.IsNil) defer store.Close() //nolint:errcheck diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index fd5094637f3..d2714f5c6fe 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -62,8 +62,7 @@ func Test(t *testing.T) { check.TestingT(t) } -type clientSuite struct { -} +type clientSuite struct{} var _ = check.Suite(&clientSuite{}) diff --git a/cdc/kv/resolvedts_heap_test.go b/cdc/kv/resolvedts_heap_test.go index 1e3c8fb8a47..4fe92e9b5f1 100644 --- a/cdc/kv/resolvedts_heap_test.go +++ b/cdc/kv/resolvedts_heap_test.go @@ -20,8 +20,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" ) -type rtsHeapSuite struct { -} +type rtsHeapSuite struct{} var _ = check.Suite(&rtsHeapSuite{}) diff --git a/cdc/kv/token_region_test.go b/cdc/kv/token_region_test.go index a8d217996ef..a5ad7d5279e 100644 --- a/cdc/kv/token_region_test.go +++ b/cdc/kv/token_region_test.go @@ -26,8 +26,7 @@ import ( "golang.org/x/sync/errgroup" ) -type tokenRegionSuite struct { -} +type tokenRegionSuite struct{} var _ = check.Suite(&tokenRegionSuite{}) diff --git a/cdc/model/owner_test.go b/cdc/model/owner_test.go index d960963cdfe..b70053c3f73 100644 --- a/cdc/model/owner_test.go +++ b/cdc/model/owner_test.go @@ -162,7 +162,6 @@ var _ = check.Suite(&taskStatusSuite{}) func (s *taskStatusSuite) TestShouldBeDeepCopy(c *check.C) { defer testleak.AfterTest(c)() info := TaskStatus{ - Tables: map[TableID]*TableReplicaInfo{ 1: {StartTs: 100}, 2: {StartTs: 100}, diff --git a/cdc/owner/barrier_test.go b/cdc/owner/barrier_test.go index 90be548eb0b..5b09cd030ba 100644 --- a/cdc/owner/barrier_test.go +++ b/cdc/owner/barrier_test.go @@ -27,8 +27,7 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&barrierSuite{}) -type barrierSuite struct { -} +type barrierSuite struct{} func (s *barrierSuite) TestBarrier(c *check.C) { defer testleak.AfterTest(c)() diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 4ba09e15202..07283a09b58 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -112,11 +112,11 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error { var _ = check.Suite(&changefeedSuite{}) -type changefeedSuite struct { -} +type changefeedSuite struct{} func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState, - map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) { + map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester, +) { ctx.GlobalVars().PDClient = &gc.MockPDClient{ UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { return safePoint, nil diff --git a/cdc/owner/ddl_puller_test.go b/cdc/owner/ddl_puller_test.go index f7ec756b63b..4be256a4c79 100644 --- a/cdc/owner/ddl_puller_test.go +++ b/cdc/owner/ddl_puller_test.go @@ -31,8 +31,7 @@ import ( var _ = check.Suite(&ddlPullerSuite{}) -type ddlPullerSuite struct { -} +type ddlPullerSuite struct{} type mockPuller struct { c *check.C diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 4a9ec487745..ae9a8c7671a 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -24,8 +24,7 @@ import ( var _ = check.Suite(&feedStateManagerSuite{}) -type feedStateManagerSuite struct { -} +type feedStateManagerSuite struct{} func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { defer testleak.AfterTest(c)() diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index b1b429eca4f..1795a0a53b5 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -34,8 +34,7 @@ import ( var _ = check.Suite(&ownerSuite{}) -type ownerSuite struct { -} +type ownerSuite struct{} type mockManager struct { gc.Manager @@ -328,22 +327,20 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { tester := orchestrator.NewReactorStateTester(c, state, nil) // no changefeed, the gc safe point should be max uint64 - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, - // set GC safepoint to (checkpointTs - 1) - c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1)) - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, + // set GC safepoint to (checkpointTs - 1) + c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1)) + return 0, nil + } err := o.updateGCSafepoint(ctx, state) c.Assert(err, check.IsNil) // add a failed changefeed, it must not trigger update GC safepoint. - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - c.Fatal("must not update") - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + c.Fatal("must not update") + return 0, nil + } changefeedID1 := "changefeed-test1" tester.MustUpdate( fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1), @@ -360,15 +357,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { // switch the state of changefeed to normal, it must update GC safepoint to // 1 (checkpoint Ts of changefeed-test1). ch := make(chan struct{}, 1) - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, - // set GC safepoint to (checkpointTs - 1) - c.Assert(safePoint, check.Equals, uint64(1)) - c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) - ch <- struct{}{} - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, + // set GC safepoint to (checkpointTs - 1) + c.Assert(safePoint, check.Equals, uint64(1)) + c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) + ch <- struct{}{} + return 0, nil + } state.Changefeeds[changefeedID1].PatchInfo( func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { info.State = model.StateNormal @@ -398,15 +394,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { return &model.ChangeFeedStatus{CheckpointTs: 30}, true, nil }) tester.MustApplyPatches() - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, - // set GC safepoint to (checkpointTs - 1) - c.Assert(safePoint, check.Equals, uint64(19)) - c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) - ch <- struct{}{} - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, + // set GC safepoint to (checkpointTs - 1) + c.Assert(safePoint, check.Equals, uint64(19)) + c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) + ch <- struct{}{} + return 0, nil + } err = o.updateGCSafepoint(ctx, state) c.Assert(err, check.IsNil) select { diff --git a/cdc/owner/schema_test.go b/cdc/owner/schema_test.go index e7386c19d2a..3bb993e6dd8 100644 --- a/cdc/owner/schema_test.go +++ b/cdc/owner/schema_test.go @@ -28,8 +28,7 @@ import ( var _ = check.Suite(&schemaSuite{}) -type schemaSuite struct { -} +type schemaSuite struct{} func (s *schemaSuite) TestAllPhysicalTables(c *check.C) { defer testleak.AfterTest(c)() diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 751900afadd..5aab744b6c0 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -36,7 +36,8 @@ type pullerNode struct { } func newPullerNode( - tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node { + tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string, +) pipeline.Node { return &pullerNode{ tableID: tableID, replicaInfo: replicaInfo, diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 75b12a5ba4c..b9c367a08c4 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -95,7 +95,8 @@ func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { func (s *mockSink) Check(c *check.C, expected []struct { resolvedTs model.Ts row *model.RowChangedEvent -}) { +}, +) { c.Assert(s.received, check.DeepEquals, expected) } diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 296051b37f1..d0c208643a4 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -165,7 +165,8 @@ func NewTablePipeline(ctx cdcContext.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, - targetTs model.Ts) TablePipeline { + targetTs model.Ts, +) TablePipeline { ctx, cancel := cdcContext.WithCancel(ctx) tablePipeline := &tablePipelineImpl{ tableID: tableID, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index b9fceed79b7..b374b7fe30a 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -35,8 +35,7 @@ import ( pd "github.com/tikv/pd/client" ) -type pullerSuite struct { -} +type pullerSuite struct{} var _ = check.Suite(&pullerSuite{}) diff --git a/cdc/puller/sorter/serde.go b/cdc/puller/sorter/serde.go index 990a2a2f8c6..48d0f348dd5 100644 --- a/cdc/puller/sorter/serde.go +++ b/cdc/puller/sorter/serde.go @@ -18,8 +18,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" ) -type msgPackGenSerde struct { -} +type msgPackGenSerde struct{} func (m *msgPackGenSerde) marshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) { bytes = bytes[:0] diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index 19dff63c1d0..66973a15c7c 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -52,8 +52,7 @@ type metricsInfo struct { captureAddr string } -type ctxKey struct { -} +type ctxKey struct{} // UnifiedSorterCheckDir checks whether the directory needed exists and is writable. // If it does not exist, we try to create one. @@ -89,7 +88,8 @@ func NewUnifiedSorter( changeFeedID model.ChangeFeedID, tableName string, tableID model.TableID, - captureAddr string) (*UnifiedSorter, error) { + captureAddr string, +) (*UnifiedSorter, error) { poolMu.Lock() defer poolMu.Unlock() diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index 6813a231983..7fb90a06dca 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -24,8 +24,7 @@ import ( "golang.org/x/text/encoding/charmap" ) -type canalFlatSuite struct { -} +type canalFlatSuite struct{} var _ = check.Suite(&canalFlatSuite{}) diff --git a/cdc/sink/codec/interface_test.go b/cdc/sink/codec/interface_test.go index 21314cc378b..0afbe63065f 100644 --- a/cdc/sink/codec/interface_test.go +++ b/cdc/sink/codec/interface_test.go @@ -22,8 +22,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" ) -type codecInterfaceSuite struct { -} +type codecInterfaceSuite struct{} var _ = check.Suite(&codecInterfaceSuite{}) diff --git a/cdc/sink/codec/schema_registry_test.go b/cdc/sink/codec/schema_registry_test.go index 967f43c58a9..99c5fe6b890 100644 --- a/cdc/sink/codec/schema_registry_test.go +++ b/cdc/sink/codec/schema_registry_test.go @@ -30,8 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" ) -type AvroSchemaRegistrySuite struct { -} +type AvroSchemaRegistrySuite struct{} var _ = check.Suite(&AvroSchemaRegistrySuite{}) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 35732438455..6edaa965eb2 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -418,7 +418,8 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, - opts map[string]string, errCh chan error) (*mqSink, error) { + opts map[string]string, errCh chan error, +) (*mqSink, error) { producerConfig := kafka.NewConfig() if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) @@ -442,7 +443,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, } func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, - replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error, +) (*mqSink, error) { producer, err := pulsar.NewProducer(sinkURI, errCh) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index d8ef81f9f1f..f79c2edf4ca 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -36,7 +36,8 @@ import ( func init() { failpoint.Inject("SimpleMySQLSinkTester", func() { sinkIniterMap["simple-mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newSimpleMySQLSink(ctx, sinkURI, config) } }) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 5fd1366cec7..256e1ea8468 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -65,13 +65,15 @@ type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *filter.Fi func init() { // register blackhole sink sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newBlackHoleSink(ctx), nil } // register mysql sink sinkIniterMap["mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts) } sinkIniterMap["tidb"] = sinkIniterMap["mysql"] @@ -80,27 +82,31 @@ func init() { // register kafka sink sinkIniterMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh) } sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"] // register pulsar sink sinkIniterMap["pulsar"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh) } sinkIniterMap["pulsar+ssl"] = sinkIniterMap["pulsar"] // register local sink sinkIniterMap["local"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return cdclog.NewLocalFileSink(ctx, sinkURI, errCh) } // register s3 sink sinkIniterMap["s3"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return cdclog.NewS3Sink(ctx, sinkURI, errCh) } } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 45676e6e815..cd12e1868cf 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -438,8 +438,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if row.Table.IsPartition { partitionID = row.Table.TableID } - row.Table.TableID = - c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) + row.Table.TableID = c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) err = sink.EmitRowChangedEvents(ctx, row) if err != nil { log.Panic("emit row changed event failed", zap.Error(err)) diff --git a/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go b/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go index 04d68db8392..646d6441318 100644 --- a/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go +++ b/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go @@ -119,19 +119,18 @@ func (o *cyclicCreateMarktablesOptions) run(cmd *cobra.Command) error { func newCmdCyclicCreateMarktables(f factory.Factory) *cobra.Command { o := newCyclicCreateMarktablesOptions() - command := - &cobra.Command{ - Use: "create-marktables", - Short: "Create cyclic replication mark tables", - RunE: func(cmd *cobra.Command, args []string) error { - err := o.complete(f) - if err != nil { - return err - } - - return o.run(cmd) - }, - } + command := &cobra.Command{ + Use: "create-marktables", + Short: "Create cyclic replication mark tables", + RunE: func(cmd *cobra.Command, args []string) error { + err := o.complete(f) + if err != nil { + return err + } + + return o.run(cmd) + }, + } o.addFlags(command) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index ac4e0003f2c..9c0642778de 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -220,7 +220,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR close(outCh) log.Info("WatchWithChan exited", zap.String("role", role)) }() - var lastRevision int64 + // get initial revision from opts to avoid revision fall back + lastRevision := getRevisionFromWatchOpts(opts...) watchCtx, cancel := context.WithCancel(ctx) defer func() { // Using closures to handle changes to the cancel function @@ -275,7 +276,7 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR watchCtx, cancel = context.WithCancel(ctx) // to avoid possible context leak warning from govet _ = cancel - watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) // we need to reset lastReceivedResponseTime after reset Watch lastReceivedResponseTime = c.clock.Now() } diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 72a295affb3..57030df3f4e 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -15,6 +15,7 @@ package etcd import ( "context" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -25,8 +26,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) -type clientSuite struct { -} +type clientSuite struct{} var _ = check.Suite(&clientSuite{}) @@ -54,17 +54,23 @@ func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse - resetCount *int - requestCount *int + resetCount *int32 + requestCount *int32 + rev *int64 } func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - *m.resetCount++ + atomic.AddInt32(m.resetCount, 1) + op := &clientv3.Op{} + for _, opt := range opts { + opt(op) + } + atomic.StoreInt64(m.rev, op.Rev()) return m.watchCh } func (m mockWatcher) RequestProgress(ctx context.Context) error { - *m.requestCount++ + atomic.AddInt32(m.requestCount, 1) return nil } @@ -146,10 +152,11 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) + rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) - watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} cli.Watcher = watcher sentRes := []clientv3.WatchResponse{ @@ -193,9 +200,9 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) { c.Check(sentRes, check.DeepEquals, receivedRes) // make sure watchCh has been reset since timeout - c.Assert(*watcher.resetCount > 1, check.IsTrue) + c.Assert(atomic.LoadInt32(watcher.resetCount) > 1, check.IsTrue) // make sure RequestProgress has been call since timeout - c.Assert(*watcher.requestCount > 1, check.IsTrue) + c.Assert(atomic.LoadInt32(watcher.requestCount) > 1, check.IsTrue) // make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration) } @@ -206,10 +213,11 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { defer s.TearDownTest(c) cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) + rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) - watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} cli.Watcher = watcher mockClock := clock.NewMock() @@ -251,6 +259,52 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { c.Check(sentRes, check.DeepEquals, receivedRes) } +func (s *clientSuite) TestRevisionNotFallBack(c *check.C) { + defer testleak.AfterTest(c)() + cli := clientv3.NewCtxClient(context.TODO()) + + resetCount := int32(0) + requestCount := int32(0) + rev := int64(0) + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} + cli.Watcher = watcher + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + key := "testRevisionNotFallBack" + outCh := make(chan clientv3.WatchResponse, 1) + // watch from revision = 2 + revision := int64(2) + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + go func() { + watchCli.WatchWithChan(ctx, outCh, key, "test", clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + // wait for WatchWithChan set up + <-outCh + // move time forward + mockClock.Add(time.Second * 30) + // make sure watchCh has been reset since timeout + c.Assert(atomic.LoadInt32(watcher.resetCount) > 1, check.IsTrue) + // make sure revision in WatchWitchChan does not fall back + // even if there has not any response been received from WatchCh + // while WatchCh was reset + c.Assert(atomic.LoadInt64(watcher.rev), check.Equals, revision) +} + type mockTxn struct { ctx context.Context mode int diff --git a/pkg/etcd/util.go b/pkg/etcd/util.go new file mode 100644 index 00000000000..e70ec080edf --- /dev/null +++ b/pkg/etcd/util.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import "go.etcd.io/etcd/clientv3" + +func getRevisionFromWatchOpts(opts ...clientv3.OpOption) int64 { + op := &clientv3.Op{} + for _, opt := range opts { + opt(op) + } + return op.Rev() +} diff --git a/pkg/etcd/util_test.go b/pkg/etcd/util_test.go new file mode 100644 index 00000000000..afc6d0d7e3c --- /dev/null +++ b/pkg/etcd/util_test.go @@ -0,0 +1,35 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package etcd + +import ( + "math" + "math/rand" + + "github.com/pingcap/check" + "github.com/pingcap/tiflow/pkg/util/testleak" + "go.etcd.io/etcd/clientv3" +) + +type utilSuit struct{} + +var _ = check.Suite(&utilSuit{}) + +func (s utilSuit) TestGetRevisionFromWatchOpts(c *check.C) { + defer testleak.AfterTest(c)() + for i := 0; i < 100; i++ { + rev := rand.Int63n(math.MaxInt64) + opt := clientv3.WithRev(rev) + c.Assert(getRevisionFromWatchOpts(opt), check.Equals, rev) + } +} diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index d51de27bfe3..2364254fa0d 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -48,8 +48,7 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&etcdWorkerSuite{}) -type etcdWorkerSuite struct { -} +type etcdWorkerSuite struct{} type simpleReactor struct { state *simpleReactorState diff --git a/pkg/orchestrator/util/key_utils_test.go b/pkg/orchestrator/util/key_utils_test.go index 5fc6fbc08dd..d84e3304382 100644 --- a/pkg/orchestrator/util/key_utils_test.go +++ b/pkg/orchestrator/util/key_utils_test.go @@ -24,8 +24,7 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&keyUtilsSuite{}) -type keyUtilsSuite struct { -} +type keyUtilsSuite struct{} func (s *keyUtilsSuite) TestEtcdKey(c *check.C) { defer testleak.AfterTest(c)() diff --git a/pkg/pipeline/node.go b/pkg/pipeline/node.go index 3f2dd7faa66..a33967c5b1f 100644 --- a/pkg/pipeline/node.go +++ b/pkg/pipeline/node.go @@ -17,7 +17,6 @@ package pipeline // The following functions in this interface will be called in one goroutine. // It's NO NEED to consider concurrency issues type Node interface { - // Init initializes the node // when the pipeline is started, this function will be called in order // you can call `ctx.SendToNextNode(msg)` to send the message to the next node diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 4d868585dba..0a4e8039dd0 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -35,8 +35,7 @@ type pipelineSuite struct{} var _ = check.Suite(&pipelineSuite{}) -type echoNode struct { -} +type echoNode struct{} func (e echoNode) Init(ctx NodeContext) error { ctx.SendToNextNode(PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -435,8 +434,7 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) { p.Wait() } -type panicNode struct { -} +type panicNode struct{} func (e panicNode) Init(ctx NodeContext) error { panic("panic in panicNode") diff --git a/pkg/scheduler/table_number.go b/pkg/scheduler/table_number.go index 7bfe59b9ace..637eb7aff4d 100644 --- a/pkg/scheduler/table_number.go +++ b/pkg/scheduler/table_number.go @@ -44,7 +44,8 @@ func (t *TableNumberScheduler) Skewness() float64 { // CalRebalanceOperates implements the Scheduler interface func (t *TableNumberScheduler) CalRebalanceOperates(targetSkewness float64) ( - skewness float64, moveTableJobs map[model.TableID]*model.MoveTableJob) { + skewness float64, moveTableJobs map[model.TableID]*model.MoveTableJob, +) { var totalTableNumber uint64 for _, captureWorkloads := range t.workloads { totalTableNumber += uint64(len(captureWorkloads)) diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index c415ebf6b17..368f879cd39 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -34,8 +34,7 @@ func Test(t *testing.T) { var _ = check.Suite(&gcManagerSuite{}) -type gcManagerSuite struct { -} +type gcManagerSuite struct{} func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { defer testleak.AfterTest(c)() diff --git a/pkg/workerpool/hash.go b/pkg/workerpool/hash.go index 9af534ca8a5..8c04f74a383 100644 --- a/pkg/workerpool/hash.go +++ b/pkg/workerpool/hash.go @@ -24,8 +24,7 @@ type Hashable interface { HashCode() int64 } -type defaultHasher struct { -} +type defaultHasher struct{} // Hash returns the hash value. func (m *defaultHasher) Hash(object Hashable) int64 {