diff --git a/cdc/task_test.go b/cdc/task_test.go index caba59d7fa1..d2b1f84e8e2 100644 --- a/cdc/task_test.go +++ b/cdc/task_test.go @@ -179,7 +179,7 @@ func (s *taskSuite) TestWatch(c *check.C) { } // Watch with a normal context - ch := s.w.Watch(context.Background()) + ch := s.w.Watch(ctx) // Trigger the ErrCompacted error c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc.restart_task_watch", "50%off"), check.IsNil) diff --git a/go.mod b/go.mod index 0ffa6d75cba..cbb62e0fa7a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.3.3 github.com/Shopify/sarama v1.27.2 github.com/apache/pulsar-client-go v0.1.1 + github.com/benbjohnson/clock v1.0.3 github.com/bradleyjkemp/grpc-tools v0.2.5 github.com/cenkalti/backoff v2.2.1+incompatible github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e diff --git a/go.sum b/go.sum index 0325ded66d1..e4f88b54209 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/aws/aws-sdk-go v1.30.24 h1:y3JPD51VuEmVqN3BEDVm4amGpDma2cKJcDPuAU1OR58= github.com/aws/aws-sdk-go v1.30.24/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= +github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 23a0e219a58..e1e286decb5 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -15,7 +15,9 @@ package etcd import ( "context" + "time" + "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" @@ -41,6 +43,14 @@ const ( backoffBaseDelayInMs = 500 // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second backoffMaxDelayInMs = 60 * 1000 + // If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long, + // we should cancel the watchCh and request a new watchCh from etcd client + etcdWatchChTimeoutDuration = 10 * time.Second + // If no msg comes from a etcd watchCh for etcdRequestProgressDuration long, + // we should call RequestProgress of etcd client + etcdRequestProgressDuration = 1 * time.Second + // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future + etcdWatchChBufferSize = 16 ) // set to var instead of const for mocking the value to speedup test @@ -50,11 +60,13 @@ var maxTries int64 = 8 type Client struct { cli *clientv3.Client metrics map[string]prometheus.Counter + // clock is for making it easier to mock time-related data structures in unit tests + clock clock.Clock } // Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC. func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client { - return &Client{cli: cli, metrics: metrics} + return &Client{cli: cli, metrics: metrics, clock: clock.New()} } // Unwrap returns a clientv3.Client @@ -165,7 +177,68 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts .. // Watch delegates request to clientv3.Watcher.Watch func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - return c.cli.Watch(ctx, key, opts...) + watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize) + go c.WatchWithChan(ctx, watchCh, key, opts...) + return watchCh +} + +// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh +func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) { + defer func() { + close(outCh) + log.Info("WatchWithChan exited") + }() + var lastRevision int64 + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := c.cli.Watch(watchCtx, key, opts...) + + ticker := c.clock.Ticker(etcdRequestProgressDuration) + defer ticker.Stop() + lastReceivedResponseTime := c.clock.Now() + + for { + select { + case <-ctx.Done(): + cancel() + return + case response := <-watchCh: + lastReceivedResponseTime = c.clock.Now() + if response.Err() == nil && !response.IsProgressNotify() { + lastRevision = response.Header.Revision + } + + Loop: + // we must loop here until the response is sent to outCh + // or otherwise the response will be lost + for { + select { + case <-ctx.Done(): + cancel() + return + case outCh <- response: // it may block here + break Loop + case <-ticker.C: + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime))) + } + } + } + case <-ticker.C: + if err := c.RequestProgress(ctx); err != nil { + log.Warn("failed to request progress for etcd watcher", zap.Error(err)) + } + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + // cancel the last cancel func to reset it + log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) + cancel() + watchCtx, cancel = context.WithCancel(ctx) + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) + // we need to reset lastReceivedResponseTime after reset Watch + lastReceivedResponseTime = c.clock.Now() + } + } + } } // RequestProgress requests a progress notify response be sent in all watch channels. diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index fbc9b3268b9..ba14f9e5ebe 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -17,6 +17,7 @@ import ( "context" "time" + "github.com/benbjohnson/clock" "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tiflow/pkg/util/testleak" @@ -44,6 +45,23 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +type mockWatcher struct { + clientv3.Watcher + watchCh chan clientv3.WatchResponse + resetCount *int + requestCount *int +} + +func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + *m.resetCount++ + return m.watchCh +} + +func (m mockWatcher) RequestProgress(ctx context.Context) error { + *m.requestCount++ + return nil +} + func (s *clientSuite) TestRetry(c *check.C) { defer testleak.AfterTest(c)() originValue := maxTries @@ -90,3 +108,113 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) { c.Assert(err, check.IsNil) c.Assert(ttlResp.TTL, check.Equals, int64(-1)) } + +// test no data lost when WatchCh blocked +func (s *etcdSuite) TestWatchChBlocked(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + cli := clientv3.NewCtxClient(context.TODO()) + resetCount := 0 + requestCount := 0 + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + cli.Watcher = watcher + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + {CompactRevision: 2}, + {CompactRevision: 3}, + {CompactRevision: 4}, + {CompactRevision: 5}, + {CompactRevision: 6}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + key := "testWatchChBlocked" + outCh := make(chan clientv3.WatchResponse, 6) + revision := int64(1) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + + go func() { + watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + receivedRes := make([]clientv3.WatchResponse, 0) + // wait for WatchWithChan set up + r := <-outCh + receivedRes = append(receivedRes, r) + // move time forward + mockClock.Add(time.Second * 30) + + for r := range outCh { + receivedRes = append(receivedRes, r) + } + + c.Check(sentRes, check.DeepEquals, receivedRes) + // make sure watchCh has been reset since timeout + c.Assert(*watcher.resetCount > 1, check.IsTrue) + // make sure RequestProgress has been call since timeout + c.Assert(*watcher.requestCount > 1, check.IsTrue) + // make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration + c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration) +} + +// test no data lost when OutCh blocked +func (s *etcdSuite) TestOutChBlocked(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + cli := clientv3.NewCtxClient(context.TODO()) + resetCount := 0 + requestCount := 0 + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + cli.Watcher = watcher + + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + {CompactRevision: 2}, + {CompactRevision: 3}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + key := "testOutChBlocked" + outCh := make(chan clientv3.WatchResponse, 1) + revision := int64(1) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + go func() { + watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + receivedRes := make([]clientv3.WatchResponse, 0) + // wait for WatchWithChan set up + r := <-outCh + receivedRes = append(receivedRes, r) + // move time forward + mockClock.Add(time.Second * 30) + + for r := range outCh { + receivedRes = append(receivedRes, r) + } + + c.Check(sentRes, check.DeepEquals, receivedRes) +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index cb721f94041..cb402edac0c 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" @@ -27,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -34,8 +36,13 @@ import ( ) const ( - etcdRequestProgressDuration = 2 * time.Second - deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second + // etcdWorkerLogsWarnDuration when EtcdWorker commits a txn to etcd or ticks + // it reactor takes more than etcdWorkerLogsWarnDuration, it will print a log + etcdWorkerLogsWarnDuration = 1 * time.Second + deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" ) // EtcdWorker handles all interactions with Etcd @@ -64,7 +71,8 @@ type EtcdWorker struct { // a `compare-and-swap` semantics, which is essential for implementing // snapshot isolation for Reactor ticks. deleteCounter int64 - metrics *etcdWorkerMetrics + + metrics *etcdWorkerMetrics } type etcdWorkerMetrics struct { @@ -121,13 +129,13 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, return errors.Trace(err) } - ctx1, cancel := context.WithCancel(ctx) - defer cancel() - ticker := time.NewTicker(timerInterval) defer ticker.Stop() - watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + var ( pendingPatches [][]DataPatch exiting bool @@ -139,7 +147,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // should never be closed sessionDone = make(chan struct{}) } - lastReceivedEventTime := time.Now() // tickRate represents the number of times EtcdWorker can tick // the reactor per second @@ -153,26 +160,40 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, return cerrors.ErrEtcdSessionDone.GenWithStackByArgs() case <-ticker.C: // There is no new event to handle on timer ticks, so we have nothing here. - if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration { - if err := worker.client.RequestProgress(ctx); err != nil { - log.Warn("failed to request progress for etcd watcher", zap.Error(err)) - } - } case response := <-watchCh: // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. if err := response.Err(); err != nil { return errors.Trace(err) } - lastReceivedEventTime = time.Now() + + // ProgressNotify implies no new events. + if response.IsProgressNotify() { + log.Debug("Etcd progress notification", + zap.Int64("revision", response.Header.GetRevision())) + // Note that we don't need to update the revision here, and we + // should not do so, because the revision of the progress notification + // may not satisfy the strict monotonicity we have expected. + // + // Updating `worker.revision` can cause a useful event with the + // same revision to be dropped erroneously. + // + // Refer to https://etcd.io/docs/v3.3/dev-guide/interacting_v3/#watch-progress + // "Note: The revision number in the progress notify response is the revision + // from the local etcd server node that the watch stream is connected to. [...]" + // This implies that the progress notification will NOT go through the raft + // consensus, thereby NOT affecting the revision (index). + continue + } + // Check whether the response is stale. if worker.revision >= response.Header.GetRevision() { + log.Info("Stale Etcd event dropped", + zap.Int64("event-revision", response.Header.GetRevision()), + zap.Int64("previous-revision", worker.revision), + zap.Any("events", response.Events)) continue } worker.revision = response.Header.GetRevision() - // ProgressNotify implies no new events. - if response.IsProgressNotify() { - continue - } for _, event := range response.Events { // handleEvent will apply the event to our internal `rawState`. @@ -216,11 +237,11 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, startTime := time.Now() // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) - costTime := time.Since(startTime).Seconds() - if costTime > time.Second.Seconds()*1 { - log.Warn("etcdWorker ticks reactor cost time more than 1 second") + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime)) } - worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime) + worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds()) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { return errors.Trace(err) @@ -323,6 +344,10 @@ func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][] } func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error { + if len(changedState) == 0 { + return nil + } + cmps := make([]clientv3.Cmp, 0, len(changedState)) ops := make([]clientv3.Op, 0, len(changedState)) hasDelete := false @@ -362,12 +387,24 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m worker.metrics.metricEtcdTxnSize.Observe(float64(size)) startTime := time.Now() - resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() - costTime := time.Since(startTime).Seconds() - if costTime > time.Second.Seconds()*1 { - log.Warn("etcdWorker commit etcd txn cost time more than 1 second") + + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() + cancel() + + // For testing the situation where we have a progress notification that + // has the same revision as the committed Etcd transaction. + failpoint.Inject("InjectProgressRequestAfterCommit", func() { + if err := worker.client.RequestProgress(ctx); err != nil { + failpoint.Return(errors.Trace(err)) + } + }) + + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime)) } - worker.metrics.metricEtcdTxnDuration.Observe(costTime) + worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds()) if err != nil { return errors.Trace(err) } @@ -378,6 +415,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m return nil } + // Logs the conditions for the failed Etcd transaction. + worker.logEtcdCmps(cmps) return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } @@ -393,19 +432,34 @@ func (worker *EtcdWorker) applyUpdates() error { return nil } -func logEtcdOps(ops []clientv3.Op, commited bool) { - if log.GetLevel() != zapcore.DebugLevel || len(ops) == 0 { +func logEtcdOps(ops []clientv3.Op, committed bool) { + if committed && (log.GetLevel() != zapcore.DebugLevel || len(ops) == 0) { return } - log.Debug("[etcd worker] ==========Update State to ETCD==========") + logFn := log.Debug + if !committed { + logFn = log.Info + } + + logFn("[etcd worker] ==========Update State to ETCD==========") for _, op := range ops { if op.IsDelete() { - log.Debug("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes())) + logFn("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes())) } else { - log.Debug("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes())) + logFn("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes())) } } - log.Debug("[etcd worker] ============State Commit=============", zap.Bool("committed", commited)) + logFn("[etcd worker] ============State Commit=============", zap.Bool("committed", committed)) +} + +func (worker *EtcdWorker) logEtcdCmps(cmps []clientv3.Cmp) { + log.Info("[etcd worker] ==========Failed Etcd Txn Cmps==========") + for _, cmp := range cmps { + cmp := etcdserverpb.Compare(cmp) + log.Info("[etcd worker] compare", + zap.String("cmp", cmp.String())) + } + log.Info("[etcd worker] ============End Failed Etcd Txn Cmps=============") } func (worker *EtcdWorker) cleanUp() { diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 5f4c766138f..7321ce6033c 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator/util" @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { defer testleak.AfterTest(c)() + + _ = failpoint.Enable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit") + }() + totalAccountNumber := 25 workerNumber := 10 var wg sync.WaitGroup