Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd (ticdc): fix a data race in unit test #4551

Merged
merged 9 commits into from
Feb 13, 2022
31 changes: 16 additions & 15 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -51,23 +52,23 @@ func (m *mockClient) Txn(ctx context.Context) clientv3.Txn {
type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
resetCount *int
requestCount *int
resetCount *int32
requestCount *int32
rev *int64
}

func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
*m.resetCount++
atomic.AddInt32(m.resetCount, 1)
op := &clientv3.Op{}
for _, opt := range opts {
opt(op)
}
*m.rev = op.Rev()
atomic.StoreInt64(m.rev, op.Rev())
return m.watchCh
}

func (m mockWatcher) RequestProgress(ctx context.Context) error {
*m.requestCount++
atomic.AddInt32(m.requestCount, 1)
return nil
}

Expand Down Expand Up @@ -153,8 +154,8 @@ func TestDelegateLease(t *testing.T) {
// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev}
Expand Down Expand Up @@ -204,18 +205,18 @@ func TestWatchChBlocked(t *testing.T) {

require.Equal(t, sentRes, receivedRes)
// make sure watchCh has been reset since timeout
require.True(t, *watcher.resetCount > 1)
require.True(t, atomic.LoadInt32(watcher.resetCount) > 1)
// make sure RequestProgress has been call since timeout
require.True(t, *watcher.requestCount > 1)
require.True(t, atomic.LoadInt32(watcher.requestCount) > 1)
// make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration
require.Less(t, etcdRequestProgressDuration, etcdWatchChTimeoutDuration)
}

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev}
Expand Down Expand Up @@ -266,8 +267,8 @@ func TestOutChBlocked(t *testing.T) {
func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())

resetCount := 0
requestCount := 0
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev}
Expand Down Expand Up @@ -301,11 +302,11 @@ func TestRevisionNotFallBack(t *testing.T) {
// move time forward
mockClock.Add(time.Second * 30)
// make sure watchCh has been reset since timeout
require.True(t, *watcher.resetCount > 1)
require.True(t, atomic.LoadInt32(watcher.resetCount) > 1)
// make suer revision in WatchWitchChan does not fall back
// even if there has not any response been received from WatchCh
// while WatchCh was reset
require.Equal(t, *watcher.rev, revision)
require.Equal(t, atomic.LoadInt64(watcher.rev), revision)
}

type mockTxn struct {
Expand Down
5 changes: 2 additions & 3 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

type Captures []*model.CaptureInfo
Expand Down