Skip to content

Commit

Permalink
Fix crts maybe less than emit resolved ts in processor (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jun 12, 2020
1 parent 64cf60d commit 913c9ab
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,23 @@ type processor struct {
}

type tableInfo struct {
id int64
resolvedTS uint64
workload model.WorkloadInfo
cancel context.CancelFunc
id int64
resolvedTs uint64
markTableID int64
mResolvedTs uint64
workload model.WorkloadInfo
cancel context.CancelFunc
}

func (t *tableInfo) loadResolvedTS() uint64 {
return atomic.LoadUint64(&t.resolvedTS)
}

func (t *tableInfo) storeResolvedTS(ts uint64) {
atomic.StoreUint64(&t.resolvedTS, ts)
func (t *tableInfo) loadResolvedTs() uint64 {
tableRts := atomic.LoadUint64(&t.resolvedTs)
if t.markTableID != 0 {
mTableRts := atomic.LoadUint64(&t.mResolvedTs)
if mTableRts < tableRts {
return mTableRts
}
}
return tableRts
}

// newProcessor creates and returns a processor for the specified change feed
Expand Down Expand Up @@ -272,7 +277,7 @@ func (p *processor) writeDebugInfo(w io.Writer) {

p.stateMu.Lock()
for _, table := range p.tables {
fmt.Fprintf(w, "\ttable id: %d, resolveTS: %d\n", table.id, table.loadResolvedTS())
fmt.Fprintf(w, "\ttable id: %d, resolveTS: %d\n", table.id, table.loadResolvedTs())
}
p.stateMu.Unlock()

Expand Down Expand Up @@ -330,7 +335,7 @@ func (p *processor) positionWorker(ctx context.Context) error {
minResolvedTs := p.ddlPuller.GetResolvedTs()
p.stateMu.Lock()
for _, table := range p.tables {
ts := table.loadResolvedTS()
ts := table.loadResolvedTs()

if ts < minResolvedTs {
minResolvedTs = ts
Expand Down Expand Up @@ -741,14 +746,14 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
ctx, cancel := context.WithCancel(ctx)
table := &tableInfo{
id: tableID,
resolvedTS: replicaInfo.StartTs,
resolvedTs: replicaInfo.StartTs,
cancel: cancel,
}
// TODO(leoppro) calculate the workload of this table
// We temporarily set the value to constant 1
table.workload = model.WorkloadInfo{Workload: 1}

startPuller := func(tableID model.TableID) {
startPuller := func(tableID model.TableID, pResolvedTs *uint64) {

// start table puller
// The key in DML kv pair returned from TiKV is not memcompariable encoded,
Expand Down Expand Up @@ -806,7 +811,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
continue
}
if pEvent.RawKV != nil && pEvent.RawKV.OpType == model.OpTypeResolved {
table.storeResolvedTS(pEvent.CRTs)
atomic.StoreUint64(pResolvedTs, pEvent.CRTs)
p.localResolvedNotifier.Notify()
resolvedTsGauge.Set(float64(oracle.ExtractPhysical(pEvent.CRTs)))
continue
Expand All @@ -824,10 +829,15 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
}()
}

startPuller(tableID)
startPuller(tableID, &table.resolvedTs)

if p.changefeed.Config.Cyclic.IsEnabled() && replicaInfo.MarkTableID != 0 {
startPuller(replicaInfo.MarkTableID)
mTableID := replicaInfo.MarkTableID

startPuller(mTableID, &table.mResolvedTs)

table.markTableID = mTableID
table.mResolvedTs = replicaInfo.StartTs
}

p.tables[tableID] = table
Expand All @@ -846,6 +856,7 @@ func (p *processor) stop(ctx context.Context) error {
for _, tbl := range p.tables {
tbl.cancel()
}
// mark tables share the same context with its original table, don't need to cancel
p.stateMu.Unlock()
atomic.StoreInt32(&p.stopped, 1)
if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureID); err != nil {
Expand Down

0 comments on commit 913c9ab

Please sign in to comment.