diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 42220d48e9d..7119f6c9cad 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -17,6 +17,7 @@ import ( "context" "io/ioutil" "os" + "sync/atomic" "testing" "time" @@ -51,23 +52,23 @@ func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse - resetCount *int - requestCount *int + resetCount *int32 + requestCount *int32 rev *int64 } func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - *m.resetCount++ + atomic.AddInt32(m.resetCount, 1) op := &clientv3.Op{} for _, opt := range opts { opt(op) } - *m.rev = op.Rev() + atomic.StoreInt64(m.rev, op.Rev()) return m.watchCh } func (m mockWatcher) RequestProgress(ctx context.Context) error { - *m.requestCount++ + atomic.AddInt32(m.requestCount, 1) return nil } @@ -153,8 +154,8 @@ func TestDelegateLease(t *testing.T) { // test no data lost when WatchCh blocked func TestWatchChBlocked(t *testing.T) { cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} @@ -204,9 +205,9 @@ func TestWatchChBlocked(t *testing.T) { require.Equal(t, sentRes, receivedRes) // make sure watchCh has been reset since timeout - require.True(t, *watcher.resetCount > 1) + require.True(t, atomic.LoadInt32(watcher.resetCount) > 1) // make sure RequestProgress has been call since timeout - require.True(t, *watcher.requestCount > 1) + require.True(t, atomic.LoadInt32(watcher.requestCount) > 1) // make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration require.Less(t, etcdRequestProgressDuration, etcdWatchChTimeoutDuration) } @@ -214,8 +215,8 @@ func TestWatchChBlocked(t *testing.T) { // test no data lost when OutCh blocked func TestOutChBlocked(t *testing.T) { cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} @@ -266,8 +267,8 @@ func TestOutChBlocked(t *testing.T) { func TestRevisionNotFallBack(t *testing.T) { cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} @@ -301,11 +302,11 @@ func TestRevisionNotFallBack(t *testing.T) { // move time forward mockClock.Add(time.Second * 30) // make sure watchCh has been reset since timeout - require.True(t, *watcher.resetCount > 1) - // make suer revision in WatchWitchChan does not fall back + require.True(t, atomic.LoadInt32(watcher.resetCount) > 1) + // make sure revision in WatchWitchChan does not fall back // even if there has not any response been received from WatchCh // while WatchCh was reset - require.Equal(t, *watcher.rev, revision) + require.Equal(t, atomic.LoadInt64(watcher.rev), revision) } type mockTxn struct { diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 1ccc678117d..36b604d44cf 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -29,14 +29,13 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/embed" "go.etcd.io/etcd/pkg/logutil" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" - - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) type Captures []*model.CaptureInfo