Skip to content

Commit

Permalink
cdc/redo: redo applier flush resolvedTs per table (#3638)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Nov 26, 2021
1 parent 56ea821 commit fc6c443
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error {
// lastResolvedTs records the max resolved ts we have seen from redo logs.
lastResolvedTs := checkpointTs
cachedRows := make([]*model.RowChangedEvent, 0, emitBatch)
tableResolvedTsMap := make(map[model.TableID]model.Ts)
for {
redoLogs, err := ra.rd.ReadNextLog(ctx, readBatch)
if err != nil {
Expand All @@ -139,6 +140,10 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error {
}

for _, redoLog := range redoLogs {
tableID := redoLog.Row.Table.TableID
if _, ok := tableResolvedTsMap[redoLog.Row.Table.TableID]; !ok {
tableResolvedTsMap[tableID] = lastSafeResolvedTs
}
if len(cachedRows) >= emitBatch {
err := s.EmitRowChangedEvents(ctx, cachedRows...)
if err != nil {
Expand All @@ -147,27 +152,33 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error {
cachedRows = make([]*model.RowChangedEvent, 0, emitBatch)
}
cachedRows = append(cachedRows, redo.LogToRow(redoLog))
if redoLog.Row.CommitTs > lastResolvedTs {
lastSafeResolvedTs, lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs

if redoLog.Row.CommitTs > tableResolvedTsMap[tableID] {
tableResolvedTsMap[tableID], lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs
}
}
// todo: use real table ID
_, err = s.FlushRowChangedEvents(ctx, 0, lastSafeResolvedTs)
if err != nil {
return err

for tableID, tableLastResolvedTs := range tableResolvedTsMap {
_, err = s.FlushRowChangedEvents(ctx, tableID, tableLastResolvedTs)
if err != nil {
return err
}
}
}
err = s.EmitRowChangedEvents(ctx, cachedRows...)
if err != nil {
return err
}
_, err = s.FlushRowChangedEvents(ctx, 0, resolvedTs)
if err != nil {
return err
}
err = s.Barrier(ctx, 0)
if err != nil {
return err

for tableID := range tableResolvedTsMap {
_, err = s.FlushRowChangedEvents(ctx, tableID, resolvedTs)
if err != nil {
return err
}
err = s.Barrier(ctx, tableID)
if err != nil {
return err
}
}
return errApplyFinished
}
Expand Down

0 comments on commit fc6c443

Please sign in to comment.