From 67011f4db16fe1d8f2e88424a8331d6dc7a206e5 Mon Sep 17 00:00:00 2001
From: zzm <zhouzemin@pingcap.com>
Date: Mon, 8 Aug 2022 19:54:47 +0800
Subject: [PATCH] [fix #193] fix cdc lose data by add resolved ts interval 
 (#196)

* resolvedts - x

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* resolvedts - x

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* resolvedts - x

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix comment & ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* add ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix comment

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* remove tmp

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix comment

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix kv ut timeout

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix check

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>

* fix ut

Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
---
 cdc/cdc/kv/client_test.go          | 45 +++++++++++++-----------------
 cdc/cdc/kv/region_worker.go        | 27 ++++++++++++++++++
 cdc/cdc/kv/region_worker_test.go   | 35 +++++++++++++++++++++++
 cdc/pkg/cmd/server/server_test.go  | 21 ++++++++------
 cdc/pkg/config/config_test_data.go |  3 +-
 cdc/pkg/config/kvclient.go         |  4 +++
 cdc/pkg/config/server_config.go    |  7 +++--
 7 files changed, 103 insertions(+), 39 deletions(-)

diff --git a/cdc/cdc/kv/client_test.go b/cdc/cdc/kv/client_test.go
index 5a2f6400..5ea37c30 100644
--- a/cdc/cdc/kv/client_test.go
+++ b/cdc/cdc/kv/client_test.go
@@ -332,7 +332,6 @@ func waitRequestID(c *check.C, allocatedID uint64) {
 func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
 	defer testleak.AfterTest(c)()
 	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
 	wg := &sync.WaitGroup{}
 	ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
 	srv := newMockChangeDataService(c, ch2)
@@ -340,6 +339,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
 	defer func() {
 		close(ch2)
 		server2.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -410,22 +410,20 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
 	case <-time.After(time.Second):
 		c.Fatalf("reconnection not succeed in 1 second")
 	}
-	checkEvent(event, 1)
+	checkEvent(event, GetSafeResolvedTs(1))
 
 	select {
 	case event = <-eventCh:
 	case <-time.After(time.Second):
 		c.Fatalf("reconnection not succeed in 1 second")
 	}
-	checkEvent(event, ver.Ver)
+	checkEvent(event, GetSafeResolvedTs(ver.Ver))
 
 	// check gRPC connection active counter is updated correctly
 	bucket, ok := grpcPool.bucketConns[invalidStore]
 	c.Assert(ok, check.IsTrue)
 	empty := bucket.recycle()
 	c.Assert(empty, check.IsTrue)
-
-	cancel()
 }
 
 func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
@@ -438,10 +436,9 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
 	defer func() {
 		close(ch2)
 		server2.Stop()
+		cancel()
 		wg.Wait()
 	}()
-	// Cancel first, and then close the server.
-	defer cancel()
 
 	rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
 	c.Assert(err, check.IsNil)
@@ -510,7 +507,6 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
 		c.Fatalf("receiving message takes too long")
 	}
 	c.Assert(len(event.Val.Value), check.Equals, largeValSize)
-	cancel()
 }
 
 func (s *clientSuite) TestHandleError(c *check.C) {
@@ -531,6 +527,7 @@ func (s *clientSuite) TestHandleError(c *check.C) {
 		server1.Stop()
 		close(ch2)
 		server2.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -678,8 +675,6 @@ consumePreResolvedTs:
 	}
 	c.Assert(event.Resolved, check.NotNil)
 	c.Assert(event.Resolved.ResolvedTs, check.Equals, uint64(120))
-
-	cancel()
 }
 
 // TestCompatibilityWithSameConn tests kv client returns an error when TiKV returns
@@ -696,6 +691,7 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -744,7 +740,6 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
 	}}
 	ch1 <- incompatibility
 	wg2.Wait()
-	cancel()
 }
 
 // TestClusterIDMismatch tests kv client returns an error when TiKV returns
@@ -761,6 +756,7 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {
 	defer func() {
 		close(changeDataCh)
 		mockService.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -816,7 +812,6 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {
 	changeDataCh <- clusterIDMismatchEvent
 
 	wg2.Wait()
-	cancel()
 }
 
 func (s *clientSuite) testHandleFeedEvent(c *check.C) {
@@ -830,6 +825,7 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -1230,8 +1226,6 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
 			c.Errorf("expected event %v not received", multipleExpected)
 		}
 	}
-
-	cancel()
 }
 
 func (s *clientSuite) TestHandleFeedEvent(c *check.C) {
@@ -1259,8 +1253,10 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) {
 	defer testleak.AfterTest(c)()
 	ctx, cancel := context.WithCancel(context.Background())
 	wg := &sync.WaitGroup{}
-	defer wg.Wait()
-	defer cancel()
+	defer func() {
+		cancel()
+		wg.Wait()
+	}()
 
 	server1Stopped := make(chan struct{})
 	ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
@@ -1383,6 +1379,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -1486,7 +1483,6 @@ eventLoop:
 		}
 	}
 	c.Assert(events, check.DeepEquals, expected)
-	cancel()
 }
 
 // TestStreamRecvWithErrorAndResolvedGoBack mainly tests the scenario that the `Recv` call of a gPRC
@@ -1722,6 +1718,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -1785,8 +1782,6 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
 	case <-time.After(time.Second):
 		c.Errorf("expected events are not receive")
 	}
-
-	cancel()
 }
 
 // TestPendingRegionError tests kv client should return an error when receiving
@@ -1803,6 +1798,7 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -1862,8 +1858,6 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
 	ev = <-eventCh
 	c.Assert(ev.Resolved, check.NotNil)
 	c.Assert(ev.Resolved.ResolvedTs, check.Equals, uint64(200))
-
-	cancel()
 }
 
 // TestDropStaleRequest tests kv client should drop an event if its request id is outdated.
@@ -1879,6 +1873,7 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -1967,7 +1962,6 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
 			c.Errorf("expected event %v not received", expectedEv)
 		}
 	}
-	cancel()
 }
 
 // TestResolveLock tests the resolve lock logic in kv client
@@ -1983,6 +1977,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -2044,7 +2039,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
 		{
 			Resolved: &model.ResolvedSpan{
 				Span:       regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
-				ResolvedTs: tso,
+				ResolvedTs: GetSafeResolvedTs(tso),
 			},
 			RegionID: regionID,
 		},
@@ -2062,8 +2057,6 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
 	// sleep 10s to simulate no resolved event longer than ResolveLockInterval
 	// resolve lock check ticker is 5s.
 	time.Sleep(10 * time.Second)
-
-	cancel()
 }
 
 func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.ChangeDataEvent) {
@@ -2077,6 +2070,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
 	defer func() {
 		close(ch1)
 		server1.Stop()
+		cancel()
 		wg.Wait()
 	}()
 
@@ -2128,7 +2122,6 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
 		ch1 <- event
 	}
 	clientWg.Wait()
-	cancel()
 }
 
 // TODO(resolved-ts): should panic. Just logging as error now.
diff --git a/cdc/cdc/kv/region_worker.go b/cdc/cdc/kv/region_worker.go
index 360709e5..dc2f6df9 100644
--- a/cdc/cdc/kv/region_worker.go
+++ b/cdc/cdc/kv/region_worker.go
@@ -730,6 +730,15 @@ func (w *regionWorker) handleResolvedTs(
 		return nil
 	}
 	regionID := state.sri.verID.GetID()
+
+	// In TiKV, when a leader transfer occurs, the old leader may send the last
+	// resolved ts, which may be larger than the new leader's first causal
+	// timestamp after transfer. So we fallback the resolved ts to a safe
+	// interval to make sure it's correct.
+	// See https://github.com/tikv/migration/issues/193.
+	// TODO: fix the issue completely.
+	resolvedTs = GetSafeResolvedTs(resolvedTs)
+
 	// Send resolved ts update in non blocking way, since we can re-query real
 	// resolved ts from region state even if resolved ts update is discarded.
 	// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
@@ -828,3 +837,21 @@ func RunWorkerPool(ctx context.Context) error {
 	})
 	return errg.Wait()
 }
+
+func GetSafeResolvedTs(resolvedTs uint64) uint64 {
+	cfg := config.GetGlobalServerConfig().KVClient
+
+	logicalTs := oracle.ExtractLogical(resolvedTs)
+	physicalTime := oracle.GetTimeFromTS(resolvedTs)
+
+	safeTime := physicalTime.Add(-cfg.ResolvedTsSafeInterval)
+	physicalTs := oracle.GetPhysical(safeTime)
+
+	if physicalTs < 0 {
+		log.Warn("The resolvedTs is smaller than the ResolvedTsSafeInterval",
+			zap.Uint64("resolvedTs", resolvedTs))
+		return resolvedTs
+	}
+
+	return oracle.ComposeTS(physicalTs, logicalTs)
+}
diff --git a/cdc/cdc/kv/region_worker_test.go b/cdc/cdc/kv/region_worker_test.go
index 8bd4f28e..383aaf64 100644
--- a/cdc/cdc/kv/region_worker_test.go
+++ b/cdc/cdc/kv/region_worker_test.go
@@ -135,3 +135,38 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
 	size = getWorkerPoolSize()
 	c.Assert(size, check.Equals, maxWorkerPoolSize)
 }
+
+func (s *regionWorkerSuite) TestGetSafeResolvedTs(c *check.C) {
+	defer testleak.AfterTest(c)()
+
+	testCases := []struct {
+		resolvedTs uint64
+		expected   uint64
+	}{
+		{
+			resolvedTs: 0,
+			expected:   0,
+		},
+		{
+			resolvedTs: 10,
+			expected:   10,
+		},
+		{
+			resolvedTs: (1 << 18) * 3 * 1000,
+			expected:   0,
+		},
+		{
+			resolvedTs: (1<<18)*3*1000 + 1,
+			expected:   1,
+		},
+		{
+			resolvedTs: (1<<18)*4*1000 + 1,
+			expected:   (1<<18)*1000 + 1,
+		},
+	}
+
+	for _, testCase := range testCases {
+		safeResolvedTs := GetSafeResolvedTs(testCase.resolvedTs)
+		c.Assert(testCase.expected, check.Equals, safeResolvedTs)
+	}
+}
diff --git a/cdc/pkg/cmd/server/server_test.go b/cdc/pkg/cmd/server/server_test.go
index e4b2ab7f..e7beb1f1 100644
--- a/cdc/pkg/cmd/server/server_test.go
+++ b/cdc/pkg/cmd/server/server_test.go
@@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) {
 		},
 		PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
 		KVClient: &config.KVClientConfig{
-			WorkerConcurrent: 8,
-			WorkerPoolSize:   0,
-			RegionScanLimit:  40,
+			WorkerConcurrent:       8,
+			WorkerPoolSize:         0,
+			RegionScanLimit:        40,
+			ResolvedTsSafeInterval: 3 * time.Second,
 		},
 		Debug: &config.DebugConfig{
 			EnableKeySpanActor: false,
@@ -299,9 +300,10 @@ server-worker-pool-size = 16
 		Security:              &config.SecurityConfig{},
 		PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
 		KVClient: &config.KVClientConfig{
-			WorkerConcurrent: 8,
-			WorkerPoolSize:   0,
-			RegionScanLimit:  40,
+			WorkerConcurrent:       8,
+			WorkerPoolSize:         0,
+			RegionScanLimit:        40,
+			ResolvedTsSafeInterval: 3 * time.Second,
 		},
 		Debug: &config.DebugConfig{
 			EnableKeySpanActor: false,
@@ -427,9 +429,10 @@ cert-allowed-cn = ["dd","ee"]
 		},
 		PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
 		KVClient: &config.KVClientConfig{
-			WorkerConcurrent: 8,
-			WorkerPoolSize:   0,
-			RegionScanLimit:  40,
+			WorkerConcurrent:       8,
+			WorkerPoolSize:         0,
+			RegionScanLimit:        40,
+			ResolvedTsSafeInterval: 3 * time.Second,
 		},
 		Debug: &config.DebugConfig{
 			EnableKeySpanActor: false,
diff --git a/cdc/pkg/config/config_test_data.go b/cdc/pkg/config/config_test_data.go
index f82c485e..6abd9e74 100644
--- a/cdc/pkg/config/config_test_data.go
+++ b/cdc/pkg/config/config_test_data.go
@@ -80,7 +80,8 @@ const (
   "kv-client": {
     "worker-concurrent": 8,
     "worker-pool-size": 0,
-    "region-scan-limit": 40
+    "region-scan-limit": 40,
+    "resolved-ts-safe-interval": 3000000000
   },
   "debug": {
     "enable-keyspan-actor": false,
diff --git a/cdc/pkg/config/kvclient.go b/cdc/pkg/config/kvclient.go
index 0df0261c..1a984196 100644
--- a/cdc/pkg/config/kvclient.go
+++ b/cdc/pkg/config/kvclient.go
@@ -13,6 +13,8 @@
 
 package config
 
+import "time"
+
 // KVClientConfig represents config for kv client
 type KVClientConfig struct {
 	// how many workers will be used for a single region worker
@@ -21,4 +23,6 @@ type KVClientConfig struct {
 	WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
 	// region incremental scan limit for one table in a single store
 	RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
+	// the safe interval to move backward resolved ts
+	ResolvedTsSafeInterval time.Duration `toml:"resolved-ts-safe-interval" json:"resolved-ts-safe-interval"`
 }
diff --git a/cdc/pkg/config/server_config.go b/cdc/pkg/config/server_config.go
index 7a4254e7..3634501a 100644
--- a/cdc/pkg/config/server_config.go
+++ b/cdc/pkg/config/server_config.go
@@ -93,9 +93,10 @@ var defaultServerConfig = &ServerConfig{
 	Security:              &SecurityConfig{},
 	PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10MB
 	KVClient: &KVClientConfig{
-		WorkerConcurrent: 8,
-		WorkerPoolSize:   0, // 0 will use NumCPU() * 2
-		RegionScanLimit:  40,
+		WorkerConcurrent:       8,
+		WorkerPoolSize:         0, // 0 will use NumCPU() * 2
+		RegionScanLimit:        40,
+		ResolvedTsSafeInterval: 3 * time.Second,
 	},
 	Debug: &DebugConfig{
 		EnableKeySpanActor: false,