Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix #193] fix cdc lose data by add resolved ts interval #196

Merged
merged 16 commits into from
Aug 8, 2022
4 changes: 2 additions & 2 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,14 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, 1)
checkEvent(event, GetSafeResolvedTs(1))

select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, ver.Ver)
checkEvent(event, GetSafeResolvedTs(ver.Ver))

// check gRPC connection active counter is updated correctly
bucket, ok := grpcPool.bucketConns[invalidStore]
Expand Down
33 changes: 28 additions & 5 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,31 +730,36 @@ func (w *regionWorker) handleResolvedTs(
return nil
}
regionID := state.sri.verID.GetID()

// In TiKV, hen a leader transfer occurs, the old leader may send the last
// resolved ts, which may be larger than the new leader appends key to ts.
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
// So we fallback the resolved ts to a safe interval to make sure it's correct.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// resolved ts, which may be larger than the new leader appends key to ts.
// So we fallback the resolved ts to a safe interval to make sure it's correct.
// So we fallback the resolved ts to a safe interval to make sure it's correct.
// See https://github.com/tikv/migration/issues/193.
// TODO: fix the issue completely.

Also suggest to write a full analysis in #193, to help us discuss with others, and completely fix it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

safeResolvedTs := GetSafeResolvedTs(resolvedTs)
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
// (1) if it is a fallback resolvedTs event, it will be discarded and accumulate penalty on the progress;
// (2) if it is a normal one, update rtsManager and check sinceLastResolvedTs
select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}:
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(safeResolvedTs)}:
default:
}

if resolvedTs < state.lastResolvedTs {
if safeResolvedTs < state.lastResolvedTs {
log.Warn("The resolvedTs is fallen back in kvclient",
zap.String("Event Type", "RESOLVED"),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("safeResolvedTs", safeResolvedTs),
zap.Uint64("lastResolvedTs", state.lastResolvedTs),
zap.Uint64("regionID", regionID))
return nil
}
state.lastResolvedTs = resolvedTs
state.lastResolvedTs = safeResolvedTs
// emit a checkpointTs
revent := model.RegionFeedEvent{
RegionID: regionID,
Resolved: &model.ResolvedSpan{
Span: state.sri.span,
ResolvedTs: resolvedTs,
ResolvedTs: safeResolvedTs,
},
}

Expand Down Expand Up @@ -828,3 +833,21 @@ func RunWorkerPool(ctx context.Context) error {
})
return errg.Wait()
}

func GetSafeResolvedTs(resolvedTs uint64) uint64 {
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
cfg := config.GetGlobalServerConfig().KVClient

logicalTs := oracle.ExtractLogical(resolvedTs)
physicalTime := oracle.GetTimeFromTS(resolvedTs)

safeTime := physicalTime.Add(-cfg.ResolvedTsSafeInterval)
physicalTs := oracle.GetPhysical(safeTime)

if physicalTs < 0 {
log.Warn("The resolvedTs is smaller than the ResolvedTsSafeInterval",
zap.Uint64("resolvedTs", resolvedTs))
return resolvedTs
}

return oracle.ComposeTS(physicalTs, logicalTs)
}
27 changes: 27 additions & 0 deletions cdc/cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,30 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
size = getWorkerPoolSize()
c.Assert(size, check.Equals, maxWorkerPoolSize)
}

func (s *regionWorkerSuite) TestGetSafeResolvedTs(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
resolvedTs uint64
expected uint64
}{
{
resolvedTs: 10,
expected: 10,
},
{
resolvedTs: (1<<18)*3*1000 + 1,
expected: 1,
},
{
resolvedTs: (1<<18)*4*1000 + 1,
expected: (1<<18)*1000 + 1,
},
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add boundary cases, e.g:

  1. resolvedTs = 0
  2. Physical clock less than 3s.
  3. Physical clock equals to 3s.


for _, testCase := range testCases {
safeResolvedTs := GetSafeResolvedTs(testCase.resolvedTs)
c.Assert(testCase.expected, check.Equals, safeResolvedTs)
}
}
21 changes: 12 additions & 9 deletions cdc/pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) {
},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down Expand Up @@ -299,9 +300,10 @@ server-worker-pool-size = 16
Security: &config.SecurityConfig{},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down Expand Up @@ -427,9 +429,10 @@ cert-allowed-cn = ["dd","ee"]
},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down
1 change: 1 addition & 0 deletions cdc/pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
"worker-concurrent": 8,
"worker-pool-size": 0,
"region-scan-limit": 40
"resolvedts-safe-interval": 3000000000,
haojinming marked this conversation as resolved.
Show resolved Hide resolved
},
"debug": {
"enable-keyspan-actor": false,
Expand Down
4 changes: 4 additions & 0 deletions cdc/pkg/config/kvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package config

import "time"

// KVClientConfig represents config for kv client
type KVClientConfig struct {
// how many workers will be used for a single region worker
Expand All @@ -21,4 +23,6 @@ type KVClientConfig struct {
WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
// region incremental scan limit for one table in a single store
RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
// the safe interval to move forward resolved ts
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
ResolvedTsSafeInterval time.Duration `toml:"reoslved-ts-safe-interval" json:"reoslvedts-safe-interval"`
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
}
7 changes: 4 additions & 3 deletions cdc/pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ var defaultServerConfig = &ServerConfig{
Security: &SecurityConfig{},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10MB
KVClient: &KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0, // 0 will use NumCPU() * 2
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0, // 0 will use NumCPU() * 2
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &DebugConfig{
EnableKeySpanActor: false,
Expand Down