Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix #193] fix cdc lose data by add resolved ts interval #196

Merged
merged 16 commits into from
Aug 8, 2022
45 changes: 19 additions & 26 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,14 @@ func waitRequestID(c *check.C, allocatedID uint64) {
func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv := newMockChangeDataService(c, ch2)
server2, addr := newMockService(ctx, c, srv, wg)
defer func() {
close(ch2)
server2.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -410,22 +410,20 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, 1)
checkEvent(event, GetSafeResolvedTs(1))

select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, ver.Ver)
checkEvent(event, GetSafeResolvedTs(ver.Ver))

// check gRPC connection active counter is updated correctly
bucket, ok := grpcPool.bucketConns[invalidStore]
c.Assert(ok, check.IsTrue)
empty := bucket.recycle()
c.Assert(empty, check.IsTrue)

cancel()
}

func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
Expand All @@ -438,10 +436,9 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
defer func() {
close(ch2)
server2.Stop()
cancel()
wg.Wait()
}()
// Cancel first, and then close the server.
defer cancel()

rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -510,7 +507,6 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
c.Fatalf("receiving message takes too long")
}
c.Assert(len(event.Val.Value), check.Equals, largeValSize)
cancel()
}

func (s *clientSuite) TestHandleError(c *check.C) {
Expand All @@ -531,6 +527,7 @@ func (s *clientSuite) TestHandleError(c *check.C) {
server1.Stop()
close(ch2)
server2.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -678,8 +675,6 @@ consumePreResolvedTs:
}
c.Assert(event.Resolved, check.NotNil)
c.Assert(event.Resolved.ResolvedTs, check.Equals, uint64(120))

cancel()
}

// TestCompatibilityWithSameConn tests kv client returns an error when TiKV returns
Expand All @@ -696,6 +691,7 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -744,7 +740,6 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
}}
ch1 <- incompatibility
wg2.Wait()
cancel()
}

// TestClusterIDMismatch tests kv client returns an error when TiKV returns
Expand All @@ -761,6 +756,7 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {
defer func() {
close(changeDataCh)
mockService.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -816,7 +812,6 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {
changeDataCh <- clusterIDMismatchEvent

wg2.Wait()
cancel()
}

func (s *clientSuite) testHandleFeedEvent(c *check.C) {
Expand All @@ -830,6 +825,7 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1230,8 +1226,6 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
c.Errorf("expected event %v not received", multipleExpected)
}
}

cancel()
}

func (s *clientSuite) TestHandleFeedEvent(c *check.C) {
Expand Down Expand Up @@ -1259,8 +1253,10 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
defer wg.Wait()
defer cancel()
defer func() {
cancel()
wg.Wait()
}()

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
Expand Down Expand Up @@ -1383,6 +1379,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1486,7 +1483,6 @@ eventLoop:
}
}
c.Assert(events, check.DeepEquals, expected)
cancel()
}

// TestStreamRecvWithErrorAndResolvedGoBack mainly tests the scenario that the `Recv` call of a gPRC
Expand Down Expand Up @@ -1722,6 +1718,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1785,8 +1782,6 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
case <-time.After(time.Second):
c.Errorf("expected events are not receive")
}

cancel()
}

// TestPendingRegionError tests kv client should return an error when receiving
Expand All @@ -1803,6 +1798,7 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1862,8 +1858,6 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
ev = <-eventCh
c.Assert(ev.Resolved, check.NotNil)
c.Assert(ev.Resolved.ResolvedTs, check.Equals, uint64(200))

cancel()
}

// TestDropStaleRequest tests kv client should drop an event if its request id is outdated.
Expand All @@ -1879,6 +1873,7 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1967,7 +1962,6 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
c.Errorf("expected event %v not received", expectedEv)
}
}
cancel()
}

// TestResolveLock tests the resolve lock logic in kv client
Expand All @@ -1983,6 +1977,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -2044,7 +2039,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: tso,
ResolvedTs: GetSafeResolvedTs(tso),
},
RegionID: regionID,
},
Expand All @@ -2062,8 +2057,6 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
// sleep 10s to simulate no resolved event longer than ResolveLockInterval
// resolve lock check ticker is 5s.
time.Sleep(10 * time.Second)

cancel()
}

func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.ChangeDataEvent) {
Expand All @@ -2077,6 +2070,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -2128,7 +2122,6 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
ch1 <- event
}
clientWg.Wait()
cancel()
}

// TODO(resolved-ts): should panic. Just logging as error now.
Expand Down
27 changes: 27 additions & 0 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,15 @@ func (w *regionWorker) handleResolvedTs(
return nil
}
regionID := state.sri.verID.GetID()

// In TiKV, when a leader transfer occurs, the old leader may send the last
// resolved ts, which may be larger than the new leader's first causal
// timestamp after transfer. So we fallback the resolved ts to a safe
// interval to make sure it's correct.
// See https://github.com/tikv/migration/issues/193.
// TODO: fix the issue completely.
resolvedTs = GetSafeResolvedTs(resolvedTs)

// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
Expand Down Expand Up @@ -828,3 +837,21 @@ func RunWorkerPool(ctx context.Context) error {
})
return errg.Wait()
}

func GetSafeResolvedTs(resolvedTs uint64) uint64 {
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
cfg := config.GetGlobalServerConfig().KVClient

logicalTs := oracle.ExtractLogical(resolvedTs)
physicalTime := oracle.GetTimeFromTS(resolvedTs)

safeTime := physicalTime.Add(-cfg.ResolvedTsSafeInterval)
physicalTs := oracle.GetPhysical(safeTime)

if physicalTs < 0 {
log.Warn("The resolvedTs is smaller than the ResolvedTsSafeInterval",
zap.Uint64("resolvedTs", resolvedTs))
return resolvedTs
}

return oracle.ComposeTS(physicalTs, logicalTs)
}
35 changes: 35 additions & 0 deletions cdc/cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,38 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
size = getWorkerPoolSize()
c.Assert(size, check.Equals, maxWorkerPoolSize)
}

func (s *regionWorkerSuite) TestGetSafeResolvedTs(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
resolvedTs uint64
expected uint64
}{
{
resolvedTs: 0,
expected: 0,
},
{
resolvedTs: 10,
expected: 10,
},
{
resolvedTs: (1 << 18) * 3 * 1000,
expected: 0,
},
{
resolvedTs: (1<<18)*3*1000 + 1,
expected: 1,
},
{
resolvedTs: (1<<18)*4*1000 + 1,
expected: (1<<18)*1000 + 1,
},
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add boundary cases, e.g:

  1. resolvedTs = 0
  2. Physical clock less than 3s.
  3. Physical clock equals to 3s.


for _, testCase := range testCases {
safeResolvedTs := GetSafeResolvedTs(testCase.resolvedTs)
c.Assert(testCase.expected, check.Equals, safeResolvedTs)
}
}
21 changes: 12 additions & 9 deletions cdc/pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) {
},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down Expand Up @@ -299,9 +300,10 @@ server-worker-pool-size = 16
Security: &config.SecurityConfig{},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down Expand Up @@ -427,9 +429,10 @@ cert-allowed-cn = ["dd","ee"]
},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down
3 changes: 2 additions & 1 deletion cdc/pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ const (
"kv-client": {
"worker-concurrent": 8,
"worker-pool-size": 0,
"region-scan-limit": 40
"region-scan-limit": 40,
"resolved-ts-safe-interval": 3000000000
},
"debug": {
"enable-keyspan-actor": false,
Expand Down
4 changes: 4 additions & 0 deletions cdc/pkg/config/kvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package config

import "time"

// KVClientConfig represents config for kv client
type KVClientConfig struct {
// how many workers will be used for a single region worker
Expand All @@ -21,4 +23,6 @@ type KVClientConfig struct {
WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
// region incremental scan limit for one table in a single store
RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
// the safe interval to move backward resolved ts
ResolvedTsSafeInterval time.Duration `toml:"resolved-ts-safe-interval" json:"resolved-ts-safe-interval"`
}
Loading