Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix #193] fix cdc lose data by add resolved ts interval #196

Merged
merged 16 commits into from
Aug 8, 2022
33 changes: 28 additions & 5 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,31 +730,39 @@ func (w *regionWorker) handleResolvedTs(
return nil
}
regionID := state.sri.verID.GetID()

safeResolvedTs := GetSafeResolvedTs(resolvedTs)
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
if safeResolvedTs == 0 {
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("The resolvedTs is smaller than the ResolvedTsSafeInterval",
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("regionID", regionID))
return nil
}
// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
// (1) if it is a fallback resolvedTs event, it will be discarded and accumulate penalty on the progress;
// (2) if it is a normal one, update rtsManager and check sinceLastResolvedTs
select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}:
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(safeResolvedTs)}:
default:
}

if resolvedTs < state.lastResolvedTs {
if safeResolvedTs < state.lastResolvedTs {
log.Warn("The resolvedTs is fallen back in kvclient",
zap.String("Event Type", "RESOLVED"),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("safeResolvedTs", safeResolvedTs),
zap.Uint64("lastResolvedTs", state.lastResolvedTs),
zap.Uint64("regionID", regionID))
return nil
}
state.lastResolvedTs = resolvedTs
state.lastResolvedTs = safeResolvedTs
// emit a checkpointTs
revent := model.RegionFeedEvent{
RegionID: regionID,
Resolved: &model.ResolvedSpan{
Span: state.sri.span,
ResolvedTs: resolvedTs,
ResolvedTs: safeResolvedTs,
},
}

Expand Down Expand Up @@ -828,3 +836,18 @@ func RunWorkerPool(ctx context.Context) error {
})
return errg.Wait()
}

func GetSafeResolvedTs(resolvedTs uint64) uint64 {
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
cfg := config.GetGlobalServerConfig().KVClient

logicalTs := oracle.ExtractLogical(resolvedTs)
time := oracle.GetTimeFromTS(resolvedTs)
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved

safeTime := time.Add(-cfg.ResolvedTsSafeInterval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If time is less than cfg.ResolvedTsSafeInterval, suggest to just return the time.
A resolved-ts = 0 would be a legal one ?

physicalTs := oracle.GetPhysical(safeTime)
if physicalTs < 0 {
return 0
}

return oracle.ComposeTS(physicalTs, logicalTs)
}
4 changes: 4 additions & 0 deletions cdc/pkg/config/kvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package config

import "time"

// KVClientConfig represents config for kv client
type KVClientConfig struct {
// how many workers will be used for a single region worker
Expand All @@ -21,4 +23,6 @@ type KVClientConfig struct {
WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
// region incremental scan limit for one table in a single store
RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
// the safe interval to move forward resolved ts
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
ResolvedTsSafeInterval time.Duration `toml:"reoslvedts-safe-interval" json:"reoslvedts-safe-interval"`
zeminzhou marked this conversation as resolved.
Show resolved Hide resolved
}
7 changes: 4 additions & 3 deletions cdc/pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ var defaultServerConfig = &ServerConfig{
Security: &SecurityConfig{},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10MB
KVClient: &KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0, // 0 will use NumCPU() * 2
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0, // 0 will use NumCPU() * 2
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &DebugConfig{
EnableKeySpanActor: false,
Expand Down