Skip to content

Commit

Permalink
Fixed test for TiFlash and reorg partition
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Jan 18, 2023
1 parent 5a31388 commit 1af113a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 56 deletions.
17 changes: 11 additions & 6 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,14 +2405,19 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
// partitions before!
return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo)
}
// Try for 10 rounds (in case of transient TiFlash issues)
if needRetry {
// The new added partition hasn't been replicated.
// Do nothing to the job this time, wait next worker round.
time.Sleep(tiflashCheckTiDBHTTPAPIHalfInterval)
// Set the error here which will lead this job exit when it's retry times beyond the limitation.
return ver, errors.Errorf("[ddl] add partition wait for tiflash replica to complete")
}
if job.ErrorCount < 10 {
// The new added partition hasn't been replicated.
// Do nothing to the job this time, wait next worker round.
time.Sleep(tiflashCheckTiDBHTTPAPIHalfInterval)
// Set the error here which will lead this job exit when it's retry times beyond the limitation.
return ver, errors.Errorf("[ddl] reorganize partition wait for tiflash replica to complete")
}

logutil.Logger(w.logCtx).Error("[ddl] reorganize partition could not find all new regions in TiFlash replicas, skipping syncing to TiFlash!", zap.String("table", tblInfo.Name.O))
return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo)
}
// When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`.
// Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public)
for _, d := range addingDefinitions {
Expand Down
121 changes: 95 additions & 26 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -298,7 +300,7 @@ func TestTiFlashReplicaPartitionTableNormal(t *testing.T) {
for _, p := range pi.Definitions {
require.True(t, tb2.Meta().TiFlashReplica.IsPartitionAvailable(p.ID))
if len(p.LessThan) == 1 && p.LessThan[0] == lessThan {
table, ok := s.tiflash.GetTableSyncStatus(int(p.ID))
table, ok := s.tiflash.GetTableSyncStatus(p.ID)
require.True(t, ok)
require.True(t, table.Accel)
}
Expand Down Expand Up @@ -338,7 +340,7 @@ func TestTiFlashReplicaPartitionTableBlock(t *testing.T) {
for _, p := range pi.Definitions {
require.True(t, tb.Meta().TiFlashReplica.IsPartitionAvailable(p.ID))
if len(p.LessThan) == 1 && p.LessThan[0] == lessThan {
table, ok := s.tiflash.GetTableSyncStatus(int(p.ID))
table, ok := s.tiflash.GetTableSyncStatus(p.ID)
require.True(t, ok)
require.True(t, table.Accel)
}
Expand Down Expand Up @@ -1105,13 +1107,13 @@ func TestTiFlashProgressAfterAvailable(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, tb)
// after available, progress should can be updated.
s.tiflash.ResetSyncStatus(int(tb.Meta().ID), false)
s.tiflash.ResetSyncStatus(tb.Meta().ID, false)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
progress, isExist := infosync.GetTiFlashProgressFromCache(tb.Meta().ID)
require.True(t, isExist)
require.True(t, progress == 0)

s.tiflash.ResetSyncStatus(int(tb.Meta().ID), true)
s.tiflash.ResetSyncStatus(tb.Meta().ID, true)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
progress, isExist = infosync.GetTiFlashProgressFromCache(tb.Meta().ID)
require.True(t, isExist)
Expand All @@ -1134,13 +1136,13 @@ func TestTiFlashProgressAfterAvailableForPartitionTable(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, tb)
// after available, progress should can be updated.
s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), false)
s.tiflash.ResetSyncStatus(tb.Meta().Partition.Definitions[0].ID, false)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
progress, isExist := infosync.GetTiFlashProgressFromCache(tb.Meta().Partition.Definitions[0].ID)
require.True(t, isExist)
require.True(t, progress == 0)

s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), true)
s.tiflash.ResetSyncStatus(tb.Meta().Partition.Definitions[0].ID, true)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
progress, isExist = infosync.GetTiFlashProgressFromCache(tb.Meta().Partition.Definitions[0].ID)
require.True(t, isExist)
Expand Down Expand Up @@ -1197,7 +1199,7 @@ func TestTiFlashProgressAvailableList(t *testing.T) {
tbls[i], err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(tableNames[i]))
require.NoError(t, err)
require.NotNil(t, tbls[i])
s.tiflash.ResetSyncStatus(int(tbls[i].Meta().ID), false)
s.tiflash.ResetSyncStatus(tbls[i].Meta().ID, false)
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollAvailableTableProgressMaxCount", `return(2)`))
defer func() {
Expand Down Expand Up @@ -1272,7 +1274,7 @@ func TestTiFlashPartitionNotAvailable(t *testing.T) {
require.NotNil(t, tb)

tk.MustExec("alter table ddltiflash set tiflash replica 1")
s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), false)
s.tiflash.ResetSyncStatus(tb.Meta().Partition.Definitions[0].ID, false)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)

tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
Expand All @@ -1282,7 +1284,7 @@ func TestTiFlashPartitionNotAvailable(t *testing.T) {
require.NotNil(t, replica)
require.False(t, replica.Available)

s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), true)
s.tiflash.ResetSyncStatus(tb.Meta().Partition.Definitions[0].ID, true)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)

tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
Expand All @@ -1292,7 +1294,7 @@ func TestTiFlashPartitionNotAvailable(t *testing.T) {
require.NotNil(t, replica)
require.True(t, replica.Available)

s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), false)
s.tiflash.ResetSyncStatus(tb.Meta().Partition.Definitions[0].ID, false)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
require.NoError(t, err)
require.NotNil(t, tb)
Expand Down Expand Up @@ -1336,7 +1338,29 @@ func TestTiFlashAvailableAfterAddPartition(t *testing.T) {
require.Equal(t, len(pi.Definitions), 2)
}

func TestTiflashReorgPartition(t *testing.T) {
// TestDLLCallback copied from ddl.TestDDLCallback, but smaller
type TestDDLCallback struct {
*ddl.BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do ddl.DomainReloader

// Only need this for now
OnJobRunBeforeExported func(*model.Job)
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

func TestTiFlashReorgPartition(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
fCancel := TempDisableEmulatorGC()
Expand All @@ -1345,32 +1369,77 @@ func TestTiflashReorgPartition(t *testing.T) {
require.NoError(t, err)
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists reorgPartTiFlash")
tk.MustExec("drop table if exists ddltiflash")

tk.MustExec(`create table reorgPartTiFlash (id int, vc varchar(255), i int, key (vc), key(i,vc))` +
tk.MustExec(`create table ddltiflash (id int, vc varchar(255), i int, key (vc), key(i,vc))` +
` partition by range (id)` +
` (partition p0 values less than (1000000), partition p1 values less than (2000000))`)
tk.MustExec(`alter table reorgPartTiFlash set tiflash replica 1`)
tb := external.GetTableByName(t, tk, "test", "reorgPartTiFlash")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec(`alter table ddltiflash set tiflash replica 1`)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
tb := external.GetTableByName(t, tk, "test", "ddltiflash")
firstPartitionID := tb.Meta().Partition.Definitions[0].ID
ruleName := fmt.Sprintf("table-%v-r", firstPartitionID)
_, ok := s.tiflash.GetPlacementRule(ruleName)
require.True(t, ok)

tk.MustExec(`insert into reorgPartTiFlash values (1,"1",1), (500500, "500500", 2), (1000001, "1000001", 3)`)
tk.MustQuery("select /*+ read_from_storage(tiflash[reorgPartTiFlash]) */ count(*) from reorgPartTiFlash partition(p0, p1)").Check(testkit.Rows("3"))
tk.MustQuery("select /*+ read_from_storage(tiflash[reorgPartTiFlash]) */ count(*) from reorgPartTiFlash partition(p0)").Check(testkit.Rows("2"))
tk.MustExec(`alter table reorgPartTiFlash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`)
tk.MustExec(`admin check table reorgPartTiFlash`)
// Note that the mock TiFlash does not have any data or regions, so the wait for regions being available will fail
dom := domain.GetDomain(tk.Session())
originHook := dom.DDL().GetHook()
defer dom.DDL().SetHook(originHook)
hook := &TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)
done := false

hook.OnJobRunBeforeExported = func(job *model.Job) {
if !done && job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateDeleteOnly {
// Let it fail once (to check that code path) then increase the count to skip retry
if job.ErrorCount > 0 {
job.ErrorCount = 1000
done = true
}
}
}
tk.MustContainErrMsg(`alter table ddltiflash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`, "[ddl] reorganize partition wait for tiflash replica to complete")

done = false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if !done && job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateDeleteOnly {
// Let it fail once (to check that code path) then mock the regions into the partitions
if job.ErrorCount > 0 {
// Add the tiflash stores as peers for the new regions, to fullfil the check
// in checkPartitionReplica
pdCli := s.store.(tikv.Storage).GetRegionCache().PDClient()
var dummy []model.CIStr
partInfo := &model.PartitionInfo{}
_ = job.DecodeArgs(&dummy, &partInfo)
ctx := context.Background()
stores, _ := pdCli.GetAllStores(ctx)
for _, pd := range partInfo.Definitions {
startKey, endKey := tablecodec.GetTableHandleKeyRange(pd.ID)
regions, _ := pdCli.ScanRegions(ctx, startKey, endKey, -1)
for i := range regions {
// similar as storeHasEngineTiFlashLabel
for _, store := range stores {
for _, label := range store.Labels {
if label.Key == placement.EngineLabelKey && label.Value == placement.EngineLabelTiFlash {
s.cluster.MockRegionManager.AddPeer(regions[i].Meta.Id, store.Id, 1)
break
}
}
}
}
}
done = true
}
}
}
tk.MustExec(`alter table ddltiflash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`)
tk.MustExec(`admin check table ddltiflash`)
_, ok = s.tiflash.GetPlacementRule(ruleName)
require.True(t, ok)
require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64))
_, ok = s.tiflash.GetPlacementRule(ruleName)
require.False(t, ok)
tk.MustQuery("select /*+ read_from_storage(tiflash[reorgPartTiFlash]) */ count(*) from reorgPartTiFlash").Check(testkit.Rows("3"))
tk.MustQuery("select /*+ read_from_storage(tiflash[reorgPartTiFlash]) */ count(*) from reorgPartTiFlash partition(p0, p1)").Check(testkit.Rows("2"))
tk.MustQuery("select /*+ read_from_storage(tiflash[reorgPartTiFlash]) */ count(*) from reorgPartTiFlash partition(p0)").Check(testkit.Rows("1"))
tk.MustExec(`drop table reorgPartTiFlash`)
tk.MustExec(`drop table ddltiflash`)
}
42 changes: 21 additions & 21 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,15 @@ func MakeNewRule(ID int64, Count uint64, LocationLabels []string) *placement.TiF
return &ruleNew
}

type mockTiFlashTableInfo struct {
Regions []int
type MockTiFlashTableInfo struct {
Regions []uint64
Accel bool
}

func (m *mockTiFlashTableInfo) String() string {
func (m *MockTiFlashTableInfo) String() string {
regionStr := ""
for _, s := range m.Regions {
regionStr = regionStr + strconv.Itoa(s) + "\n"
regionStr = regionStr + strconv.FormatUint(s, 10) + "\n"
}
if regionStr == "" {
regionStr = "\n"
Expand All @@ -406,7 +406,7 @@ type MockTiFlash struct {
groupIndex int
StatusAddr string
StatusServer *httptest.Server
SyncStatus map[int]mockTiFlashTableInfo
SyncStatus map[int64]MockTiFlashTableInfo
StoreInfo map[uint64]helper.StoreBaseStat
GlobalTiFlashPlacementRules map[string]placement.TiFlashRule
PdEnabled bool
Expand All @@ -429,15 +429,15 @@ func (tiflash *MockTiFlash) setUpMockTiFlashHTTPServer() {
tiflash.Lock()
defer tiflash.Unlock()
params := mux.Vars(req)
tableID, err := strconv.Atoi(params["tableid"])
tableID, err := strconv.ParseInt(params["tableid"], 10, 64)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
table, ok := tiflash.SyncStatus[tableID]
if tiflash.NotAvailable {
// No region is available, so the table is not available.
table.Regions = []int{}
table.Regions = []uint64{}
}
if !ok {
w.WriteHeader(http.StatusOK)
Expand All @@ -463,7 +463,7 @@ func NewMockTiFlash() *MockTiFlash {
tiflash := &MockTiFlash{
StatusAddr: "",
StatusServer: nil,
SyncStatus: make(map[int]mockTiFlashTableInfo),
SyncStatus: make(map[int64]MockTiFlashTableInfo),
StoreInfo: make(map[uint64]helper.StoreBaseStat),
GlobalTiFlashPlacementRules: make(map[string]placement.TiFlashRule),
PdEnabled: true,
Expand Down Expand Up @@ -491,19 +491,19 @@ func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) e
tiflash.GlobalTiFlashPlacementRules[rule.ID] = rule
}
// Pd shall schedule TiFlash, we can mock here
tid := 0
tid := int64(0)
_, err := fmt.Sscanf(rule.ID, "table-%d-r", &tid)
if err != nil {
return errors.New("Can't parse rule")
}
// Set up mock TiFlash replica
f := func() {
if z, ok := tiflash.SyncStatus[tid]; ok {
z.Regions = []int{1}
z.Regions = []uint64{1}
tiflash.SyncStatus[tid] = z
} else {
tiflash.SyncStatus[tid] = mockTiFlashTableInfo{
Regions: []int{1},
tiflash.SyncStatus[tid] = MockTiFlashTableInfo{
Regions: []uint64{1},
Accel: false,
}
}
Expand All @@ -521,16 +521,16 @@ func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) e
}

// ResetSyncStatus is mock function for reset sync status.
func (tiflash *MockTiFlash) ResetSyncStatus(tableID int, canAvailable bool) {
func (tiflash *MockTiFlash) ResetSyncStatus(tableID int64, canAvailable bool) {
tiflash.Lock()
defer tiflash.Unlock()
if canAvailable {
if z, ok := tiflash.SyncStatus[tableID]; ok {
z.Regions = []int{1}
z.Regions = []uint64{1}
tiflash.SyncStatus[tableID] = z
} else {
tiflash.SyncStatus[tableID] = mockTiFlashTableInfo{
Regions: []int{1},
tiflash.SyncStatus[tableID] = MockTiFlashTableInfo{
Regions: []uint64{1},
Accel: false,
}
}
Expand Down Expand Up @@ -563,13 +563,13 @@ func (tiflash *MockTiFlash) HandlePostAccelerateSchedule(endKey string) error {
defer tiflash.Unlock()
tableID := helper.GetTiFlashTableIDFromEndKey(endKey)

table, ok := tiflash.SyncStatus[int(tableID)]
table, ok := tiflash.SyncStatus[tableID]
if ok {
table.Accel = true
tiflash.SyncStatus[int(tableID)] = table
tiflash.SyncStatus[tableID] = table
} else {
tiflash.SyncStatus[int(tableID)] = mockTiFlashTableInfo{
Regions: []int{},
tiflash.SyncStatus[tableID] = MockTiFlashTableInfo{
Regions: []uint64{},
Accel: true,
}
}
Expand Down Expand Up @@ -730,7 +730,7 @@ func (tiflash *MockTiFlash) PlacementRulesLen() int {
}

// GetTableSyncStatus returns table sync status by given tableID.
func (tiflash *MockTiFlash) GetTableSyncStatus(tableID int) (*mockTiFlashTableInfo, bool) {
func (tiflash *MockTiFlash) GetTableSyncStatus(tableID int64) (*MockTiFlashTableInfo, bool) {
tiflash.Lock()
defer tiflash.Unlock()
if r, ok := tiflash.SyncStatus[tableID]; ok {
Expand Down
5 changes: 2 additions & 3 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,9 +1311,8 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
// Because A nil of type *partition is a kind of `table.PhysicalTable`
part, ok := t.partitions[pid]
if !ok {
// TODO: remove and just keep return nil
panic("MJONSS: How did we get here?")
//return nil
// Should never happen!
return nil
}
return part
}
Expand Down

0 comments on commit 1af113a

Please sign in to comment.