Skip to content

Commit

Permalink
ddl: fix resuming to wrong checkpoint when failed during adding index (
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Aug 20, 2024
1 parent c1c74b1 commit 6d95459
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
18 changes: 12 additions & 6 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ func loadTableRanges(
zap.Int64("physicalTableID", t.GetPhysicalID()))
return []kv.KeyRange{{StartKey: startKey, EndKey: endKey}}, nil
}
failpoint.Inject("setLimitForLoadTableRanges", func(val failpoint.Value) {
if v, ok := val.(int); ok {
limit = v
}
})

rc := s.GetRegionCache()
maxSleep := 10000 // ms
Expand All @@ -466,6 +471,12 @@ func loadTableRanges(
if err != nil {
return false, errors.Trace(err)
}
var mockErr bool
failpoint.InjectCall("beforeLoadRangeFromPD", &mockErr)
if mockErr {
return false, kv.ErrTxnRetryable
}

ranges = make([]kv.KeyRange, 0, len(rs))
for _, r := range rs {
ranges = append(ranges, kv.KeyRange{StartKey: r.StartKey(), EndKey: r.EndKey()})
Expand Down Expand Up @@ -636,12 +647,7 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co
return decodeColMap, nil
}

var backfillTaskChanSize = 128

// SetBackfillTaskChanSizeForTest is only used for test.
func SetBackfillTaskChanSizeForTest(n int) {
backfillTaskChanSize = n
}
const backfillTaskChanSize = 128

func (dc *ddlCtx) runAddIndexInLocalIngestMode(
ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K
if src.cpMgr == nil {
return start, false
}
cpKey := src.cpMgr.LastProcessedKey()
cpKey := src.cpMgr.NextKeyToProcess()
if len(cpKey) == 0 {
return start, false
}
Expand All @@ -364,7 +364,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K
if cpKey.Cmp(end) == 0 {
return cpKey, true
}
return cpKey.Next(), false
return cpKey, false
}

func (src *TableScanTaskSource) generateTasks() error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool {
return s.localDataIsValid && len(s.flushedKeyLowWatermark) > 0 && end.Cmp(s.flushedKeyLowWatermark) <= 0
}

// LastProcessedKey finds the last processed key in checkpoint.
// If there is no processed key, it returns nil.
func (s *CheckpointManager) LastProcessedKey() kv.Key {
// NextKeyToProcess finds the next unprocessed key in checkpoint.
// If there is no such key, it returns nil.
func (s *CheckpointManager) NextKeyToProcess() kv.Key {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
35 changes: 31 additions & 4 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ func TestAddIndexSplitTableRanges(t *testing.T) {
}
tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1"))

ddl.SetBackfillTaskChanSizeForTest(4)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(4)")
tk.MustExec("alter table t add index idx(b);")
tk.MustExec("admin check table t;")
ddl.SetBackfillTaskChanSizeForTest(7)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(7)")
tk.MustExec("alter table t add index idx_2(b);")
tk.MustExec("admin check table t;")

Expand All @@ -361,10 +361,37 @@ func TestAddIndexSplitTableRanges(t *testing.T) {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
tk.MustQuery("split table t by (10000),(20000),(30000),(40000),(50000),(60000);").Check(testkit.Rows("6 1"))
ddl.SetBackfillTaskChanSizeForTest(4)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(4)")
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
}

func TestAddIndexLoadTableRangeError(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec(`set global tidb_enable_dist_task=off;`) // Use checkpoint manager.

tk.MustExec("create table t (a int primary key, b int);")
for i := 0; i < 8; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
tk.MustQuery("split table t by (10000),(20000),(30000),(40000),(50000),(60000);").Check(testkit.Rows("6 1"))

testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(3)")
var batchCnt int
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadRangeFromPD", func(mockErr *bool) {
batchCnt++
if batchCnt == 2 {
*mockErr = true
}
})
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/forceSyncFlagForTest", "return")
tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
ddl.SetBackfillTaskChanSizeForTest(1024)
}

func TestAddIndexMockFlushError(t *testing.T) {
Expand Down

0 comments on commit 6d95459

Please sign in to comment.