diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index f7284968484e2..0b7af168c8ae2 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -830,12 +830,15 @@ func (s *testStateChangeSuite) TestParallelAddPrimaryKey(c *C) { func (s *testStateChangeSuite) TestParallelAlterAddPartition(c *C) { sql1 := `alter table t_part add partition ( partition p2 values less than (30) + );` + sql2 := `alter table t_part add partition ( + partition p3 values less than (30) );` f := func(c *C, err1, err2 error) { c.Assert(err1, IsNil) c.Assert(err2.Error(), Equals, "[ddl:1493]VALUES LESS THAN value must be strictly increasing for each partition") } - s.testControlParallelExecSQL(c, sql1, sql1, f) + s.testControlParallelExecSQL(c, sql1, sql2, f) } func (s *testStateChangeSuite) TestParallelDropColumn(c *C) { diff --git a/ddl/db_test.go b/ddl/db_test.go index d0544a9139725..fe45236ec5a16 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1239,6 +1239,8 @@ func (s *testDBSuite1) TestCancelAddTableAndDropTablePartition(c *C) { }{ {model.ActionAddTablePartition, model.JobStateNone, model.StateNone, true}, {model.ActionDropTablePartition, model.JobStateNone, model.StateNone, true}, + // Add table partition now can be cancelled in ReplicaOnly state. + {model.ActionAddTablePartition, model.JobStateRunning, model.StateReplicaOnly, true}, {model.ActionAddTablePartition, model.JobStateRunning, model.StatePublic, false}, {model.ActionDropTablePartition, model.JobStateRunning, model.StatePublic, false}, } diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 3ff5c01d25624..8c1c5f591c518 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -460,6 +460,10 @@ func buildCancelJobTests(firstID int64) []testCancelJob { {act: model.ActionAddPrimaryKey, jobIDs: []int64{firstID + 35}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 35)}, cancelState: model.StatePublic}, {act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 36}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly}, {act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 37}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 37)}, cancelState: model.StateDeleteOnly}, + + {act: model.ActionAddTablePartition, jobIDs: []int64{firstID + 42}, cancelRetErrs: noErrs, cancelState: model.StateNone}, + {act: model.ActionAddTablePartition, jobIDs: []int64{firstID + 43}, cancelRetErrs: noErrs, cancelState: model.StateReplicaOnly}, + {act: model.ActionAddTablePartition, jobIDs: []int64{firstID + 44}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob}, cancelState: model.StatePublic}, } return tests @@ -862,6 +866,33 @@ func (s *testDDLSuite) TestCancelJob(c *C) { testDropIndex(c, ctx, d, dbInfo, tblInfo, idxOrigName) c.Check(errors.ErrorStack(checkErr), Equals, "") s.checkDropIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true) + + // Cancel add table partition. + baseTableInfo := testTableInfoWithPartitionLessThan(c, d, "empty_table", 5, "1000") + testCreateTable(c, ctx, d, dbInfo, baseTableInfo) + + cancelState = model.StateNone + updateTest(&tests[34]) + addedPartInfo := testAddedNewTablePartitionInfo(c, d, baseTableInfo, "p1", "maxvalue") + addPartitionArgs := []interface{}{addedPartInfo} + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, baseTableInfo.ID, test.act, addPartitionArgs, &cancelState) + c.Check(checkErr, IsNil) + baseTable := testGetTable(c, d, dbInfo.ID, baseTableInfo.ID) + c.Assert(len(baseTable.Meta().Partition.Definitions), Equals, 1) + + updateTest(&tests[35]) + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, baseTableInfo.ID, test.act, addPartitionArgs, &cancelState) + c.Check(checkErr, IsNil) + baseTable = testGetTable(c, d, dbInfo.ID, baseTableInfo.ID) + c.Assert(len(baseTable.Meta().Partition.Definitions), Equals, 1) + + updateTest(&tests[36]) + doDDLJobSuccess(ctx, d, c, dbInfo.ID, baseTableInfo.ID, test.act, addPartitionArgs) + c.Check(checkErr, IsNil) + baseTable = testGetTable(c, d, dbInfo.ID, baseTableInfo.ID) + c.Assert(len(baseTable.Meta().Partition.Definitions), Equals, 2) + c.Assert(baseTable.Meta().Partition.Definitions[1].ID, Equals, addedPartInfo.Definitions[0].ID) + c.Assert(baseTable.Meta().Partition.Definitions[1].LessThan[0], Equals, addedPartInfo.Definitions[0].LessThan[0]) } func (s *testDDLSuite) TestIgnorableSpec(c *C) { diff --git a/ddl/index.go b/ddl/index.go old mode 100755 new mode 100644 diff --git a/ddl/partition.go b/ddl/partition.go index 0dd2f97c2fb02..f86d483f24a12 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -15,11 +15,14 @@ package ddl import ( "bytes" + "context" "fmt" "strconv" "strings" + "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/format" @@ -31,15 +34,235 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) const ( partitionMaxValue = "MAXVALUE" ) +func checkAddPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.PartitionInfo, []model.PartitionDefinition, error) { + schemaID := job.SchemaID + tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + partInfo := &model.PartitionInfo{} + err = job.DecodeArgs(&partInfo) + if err != nil { + job.State = model.JobStateCancelled + return nil, nil, nil, errors.Trace(err) + } + if len(tblInfo.Partition.AddingDefinitions) > 0 { + return tblInfo, partInfo, tblInfo.Partition.AddingDefinitions, nil + } + return tblInfo, partInfo, []model.PartitionDefinition{}, nil +} + +func onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { + // Handle the rolling back job + if job.IsRollingback() { + ver, err := onDropTablePartition(t, job) + if err != nil { + return ver, errors.Trace(err) + } + return ver, nil + } + + tblInfo, partInfo, addingDefinitions, err := checkAddPartition(t, job) + if err != nil { + return ver, err + } + + // In order to skip maintaining the state check in partitionDefinition, TiDB use addingDefinition instead of state field. + // So here using `job.SchemaState` to judge what the stage of this job is. + switch job.SchemaState { + case model.StateNone: + // job.SchemaState == model.StateNone means the job is in the initial state of add partition. + // Here should use partInfo from job directly and do some check action. + err = checkAddPartitionTooManyPartitions(uint64(len(tblInfo.Partition.Definitions) + len(partInfo.Definitions))) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkAddPartitionValue(tblInfo, partInfo) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkAddPartitionNameUnique(tblInfo, partInfo) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + // none -> replica only + job.SchemaState = model.StateReplicaOnly + // move the adding definition into tableInfo. + updateAddingPartitionInfo(partInfo, tblInfo) + ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true) + case model.StateReplicaOnly: + // replica only -> public + // Here need do some tiflash replica complement check. + // TODO: If a table is with no TiFlashReplica or it is not available, the replica-only state can be eliminated. + if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { + // For available state, the new added partition should wait it's replica to + // be finished. Otherwise the query to this partition will be blocked. + needWait, err := checkPartitionReplica(addingDefinitions, d) + if err != nil { + ver, err = convertAddTablePartitionJob2RollbackJob(t, job, err, tblInfo) + return ver, err + } + if needWait { + // The new added partition hasn't been replicated. + // Do nothing to the job this time, wait next worker round. + time.Sleep(tiflashCheckTiDBHTTPAPIHalfInterval) + return ver, nil + } + } + + // For normal and replica finished table, move the `addingDefinitions` into `Definitions`. + updatePartitionInfo(tblInfo) + + ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddTablePartition, TableInfo: tblInfo, PartInfo: partInfo}) + default: + err = ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) + } + + return ver, errors.Trace(err) +} + +// updatePartitionInfo merge `addingDefinitions` into `Definitions` in the tableInfo. +func updatePartitionInfo(tblInfo *model.TableInfo) { + parInfo := &model.PartitionInfo{} + oldDefs, newDefs := tblInfo.Partition.Definitions, tblInfo.Partition.AddingDefinitions + parInfo.Definitions = make([]model.PartitionDefinition, 0, len(newDefs)+len(oldDefs)) + parInfo.Definitions = append(parInfo.Definitions, oldDefs...) + parInfo.Definitions = append(parInfo.Definitions, newDefs...) + tblInfo.Partition.Definitions = parInfo.Definitions + tblInfo.Partition.AddingDefinitions = nil +} + +// updateAddingPartitionInfo write adding partitions into `addingDefinitions` field in the tableInfo. +func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *model.TableInfo) { + newDefs := partitionInfo.Definitions + tblInfo.Partition.AddingDefinitions = make([]model.PartitionDefinition, 0, len(newDefs)) + tblInfo.Partition.AddingDefinitions = append(tblInfo.Partition.AddingDefinitions, newDefs...) +} + +// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo. +func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) []int64 { + physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions)) + for _, one := range tblInfo.Partition.AddingDefinitions { + physicalTableIDs = append(physicalTableIDs, one.ID) + } + tblInfo.Partition.AddingDefinitions = nil + return physicalTableIDs +} + +// checkAddPartitionValue values less than value must be strictly increasing for each partition. +func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) error { + if meta.Partition.Type == model.PartitionTypeRange && len(meta.Partition.Columns) == 0 { + newDefs, oldDefs := part.Definitions, meta.Partition.Definitions + rangeValue := oldDefs[len(oldDefs)-1].LessThan[0] + if strings.EqualFold(rangeValue, "MAXVALUE") { + return errors.Trace(ErrPartitionMaxvalue) + } + + currentRangeValue, err := strconv.Atoi(rangeValue) + if err != nil { + return errors.Trace(err) + } + + for i := 0; i < len(newDefs); i++ { + ifMaxvalue := strings.EqualFold(newDefs[i].LessThan[0], "MAXVALUE") + if ifMaxvalue && i == len(newDefs)-1 { + return nil + } else if ifMaxvalue && i != len(newDefs)-1 { + return errors.Trace(ErrPartitionMaxvalue) + } + + nextRangeValue, err := strconv.Atoi(newDefs[i].LessThan[0]) + if err != nil { + return errors.Trace(err) + } + if nextRangeValue <= currentRangeValue { + return errors.Trace(ErrRangeNotIncreasing) + } + currentRangeValue = nextRangeValue + } + } + return nil +} + +func checkPartitionReplica(addingDefinitions []model.PartitionDefinition, d *ddlCtx) (needWait bool, err error) { + ctx := context.Background() + pdCli := d.store.(tikv.Storage).GetRegionCache().PDClient() + stores, err := pdCli.GetAllStores(ctx) + if err != nil { + return needWait, errors.Trace(err) + } + for _, pd := range addingDefinitions { + startKey, endKey := tablecodec.GetTableHandleKeyRange(pd.ID) + regions, _, err := pdCli.ScanRegions(ctx, startKey, endKey, -1) + if err != nil { + return needWait, errors.Trace(err) + } + // For every region in the partition, if it has some corresponding peers and + // no pending peers, that means the replication has completed. + for _, region := range regions { + regionState, err := pdCli.GetRegionByID(ctx, region.Id) + if err != nil { + return needWait, errors.Trace(err) + } + tiflashPeerAtLeastOne := checkTiFlashPeerStoreAtLeastOne(stores, regionState.Meta.Peers) + // It's unnecessary to wait all tiflash peer to be replicated. + // Here only make sure that tiflash peer count > 0 (at least one). + if tiflashPeerAtLeastOne { + continue + } + needWait = true + logutil.BgLogger().Info("[ddl] partition replicas check failed in replica-only DDL state", zap.Int64("pID", pd.ID), zap.Uint64("wait region ID", region.Id), zap.Bool("tiflash peer at least one", tiflashPeerAtLeastOne), zap.Time("check time", time.Now())) + return needWait, nil + } + } + logutil.BgLogger().Info("[ddl] partition replicas check ok in replica-only DDL state") + return needWait, nil +} + +func checkTiFlashPeerStoreAtLeastOne(stores []*metapb.Store, peers []*metapb.Peer) bool { + for _, peer := range peers { + for _, store := range stores { + if peer.StoreId == store.Id && storeHasEngineTiFlashLabel(store) { + return true + } + } + } + return false +} + +func storeHasEngineTiFlashLabel(store *metapb.Store) bool { + for _, label := range store.Labels { + if label.Key == "engine" && label.Value == "tiflash" { + return true + } + } + return false +} + // buildTablePartitionInfo builds partition info and checks for some errors. func buildTablePartitionInfo(ctx sessionctx.Context, s *ast.CreateTableStmt) (*model.PartitionInfo, error) { if s.Partition == nil { @@ -617,20 +840,31 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) { if err != nil { return ver, errors.Trace(err) } - // If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist. - err = checkDropTablePartition(tblInfo, partNames) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - physicalTableIDs := removePartitionInfo(tblInfo, partNames) - ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) - if err != nil { - return ver, errors.Trace(err) + var physicalTableIDs []int64 + if job.Type == model.ActionAddTablePartition { + // It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo. + physicalTableIDs = rollbackAddingPartitionInfo(tblInfo) + } else { + // If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist. + err = checkDropTablePartition(tblInfo, partNames) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + physicalTableIDs = removePartitionInfo(tblInfo, partNames) + ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } } // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + if job.IsRollingback() { + job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) + } else { + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + } + // A background job will be created to delete old partition data. job.Args = []interface{}{physicalTableIDs} return ver, nil diff --git a/ddl/reorg.go b/ddl/reorg.go old mode 100755 new mode 100644 diff --git a/ddl/rollingback.go b/ddl/rollingback.go index ba45101dc9314..8c9de46e1abac 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -188,12 +188,33 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP return } +func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { + job.State = model.JobStateRollingback + addingDefinitions := tblInfo.Partition.AddingDefinitions + partNames := make([]string, 0, len(addingDefinitions)) + for _, pd := range addingDefinitions { + partNames = append(partNames, pd.Name.L) + } + job.Args = []interface{}{partNames} + ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + return ver, errors.Trace(otherwiseErr) +} + func rollingbackAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, err error) { - _, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + tblInfo, _, addingDefinitions, err := checkAddPartition(t, job) if err != nil { return ver, errors.Trace(err) } - return cancelOnlyNotHandledJob(job) + // addingDefinitions' len = 0 means the job hasn't reached the replica-only state. + if len(addingDefinitions) == 0 { + job.State = model.JobStateCancelled + return ver, errors.Trace(errCancelledDDLJob) + } + // addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly. + return convertAddTablePartitionJob2RollbackJob(t, job, errCancelledDDLJob, tblInfo) } func rollingbackDropTableOrView(t *meta.Meta, job *model.Job) error { diff --git a/ddl/table.go b/ddl/table.go index a7fa0e8df494e..1b79d5d51739f 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -15,9 +15,8 @@ package ddl import ( "fmt" - "strconv" - "strings" "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -36,6 +35,8 @@ import ( "github.com/pingcap/tidb/util/gcutil" ) +const tiflashCheckTiDBHTTPAPIHalfInterval = 2500 * time.Millisecond + func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { if val.(bool) { @@ -949,93 +950,6 @@ func updateVersionAndTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.Tabl return ver, t.UpdateTable(job.SchemaID, tblInfo) } -// TODO: It may have the issue when two clients concurrently add partitions to a table. -func onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { - partInfo := &model.PartitionInfo{} - err := job.DecodeArgs(&partInfo) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - err = checkAddPartitionTooManyPartitions(uint64(len(tblInfo.Partition.Definitions) + len(partInfo.Definitions))) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - err = checkAddPartitionValue(tblInfo, partInfo) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - err = checkAddPartitionNameUnique(tblInfo, partInfo) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - updatePartitionInfo(partInfo, tblInfo) - ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) - if err != nil { - return ver, errors.Trace(err) - } - // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) - asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddTablePartition, TableInfo: tblInfo, PartInfo: partInfo}) - return ver, errors.Trace(err) -} - -func updatePartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *model.TableInfo) { - parInfo := &model.PartitionInfo{} - oldDefs, newDefs := tblInfo.Partition.Definitions, partitionInfo.Definitions - parInfo.Definitions = make([]model.PartitionDefinition, 0, len(newDefs)+len(oldDefs)) - parInfo.Definitions = append(parInfo.Definitions, oldDefs...) - parInfo.Definitions = append(parInfo.Definitions, newDefs...) - tblInfo.Partition.Definitions = parInfo.Definitions -} - -// checkAddPartitionValue values less than value must be strictly increasing for each partition. -func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) error { - if meta.Partition.Type == model.PartitionTypeRange && len(meta.Partition.Columns) == 0 { - newDefs, oldDefs := part.Definitions, meta.Partition.Definitions - rangeValue := oldDefs[len(oldDefs)-1].LessThan[0] - if strings.EqualFold(rangeValue, "MAXVALUE") { - return errors.Trace(ErrPartitionMaxvalue) - } - - currentRangeValue, err := strconv.Atoi(rangeValue) - if err != nil { - return errors.Trace(err) - } - - for i := 0; i < len(newDefs); i++ { - ifMaxvalue := strings.EqualFold(newDefs[i].LessThan[0], "MAXVALUE") - if ifMaxvalue && i == len(newDefs)-1 { - return nil - } else if ifMaxvalue && i != len(newDefs)-1 { - return errors.Trace(ErrPartitionMaxvalue) - } - - nextRangeValue, err := strconv.Atoi(newDefs[i].LessThan[0]) - if err != nil { - return errors.Trace(err) - } - if nextRangeValue <= currentRangeValue { - return errors.Trace(ErrRangeNotIncreasing) - } - currentRangeValue = nextRangeValue - } - } - return nil -} - func onRepairTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID tblInfo := &model.TableInfo{} diff --git a/ddl/table_test.go b/ddl/table_test.go index ae55af6a13476..6e39d30140f09 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -68,7 +68,7 @@ func testTableInfo(c *C, d *ddl, name string, num int) *model.TableInfo { return tblInfo } -// testTableInfo creates a test table with num int columns and with no index. +// testTableInfoWithPartition creates a test table with num int columns and with no index. func testTableInfoWithPartition(c *C, d *ddl, name string, num int) *model.TableInfo { tblInfo := testTableInfo(c, d, name, num) genIDs, err := d.genGlobalIDs(1) @@ -88,6 +88,30 @@ func testTableInfoWithPartition(c *C, d *ddl, name string, num int) *model.Table return tblInfo } +// testTableInfoWithPartitionLessThan creates a test table with num int columns and one partition specified with lessthan. +func testTableInfoWithPartitionLessThan(c *C, d *ddl, name string, num int, lessthan string) *model.TableInfo { + tblInfo := testTableInfoWithPartition(c, d, name, num) + tblInfo.Partition.Definitions[0].LessThan = []string{lessthan} + return tblInfo +} + +func testAddedNewTablePartitionInfo(c *C, d *ddl, tblInfo *model.TableInfo, partName, lessthan string) *model.PartitionInfo { + genIDs, err := d.genGlobalIDs(1) + c.Assert(err, IsNil) + pid := genIDs[0] + // the new added partition should change the partition state to state none at the beginning. + return &model.PartitionInfo{ + Type: model.PartitionTypeRange, + Expr: tblInfo.Columns[0].Name.L, + Enable: true, + Definitions: []model.PartitionDefinition{{ + ID: pid, + Name: model.NewCIStr(partName), + LessThan: []string{lessthan}, + }}, + } +} + // testViewInfo creates a test view with num int columns. func testViewInfo(c *C, d *ddl, name string, num int) *model.TableInfo { tblInfo := &model.TableInfo{ diff --git a/go.mod b/go.mod index ea63ae3d92152..b9c5294c94d87 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c github.com/pingcap/log v0.0.0-20200511115504-543df19646ad - github.com/pingcap/parser v0.0.0-20200731074050-60bb73054ef8 + github.com/pingcap/parser v0.0.0-20200803072748-fdf66528323d github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1 github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible diff --git a/go.sum b/go.sum index 9abfab06b7a5b..bfb20a01062f6 100644 --- a/go.sum +++ b/go.sum @@ -422,8 +422,8 @@ github.com/pingcap/parser v0.0.0-20200424075042-8222d8b724a4/go.mod h1:9v0Edh8Ib github.com/pingcap/parser v0.0.0-20200507022230-f3bf29096657/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= github.com/pingcap/parser v0.0.0-20200603032439-c4ecb4508d2f/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= -github.com/pingcap/parser v0.0.0-20200731074050-60bb73054ef8 h1:zQ9uQQNedlGBuCllPcOsCvkPPrtZXG26ZIWlRo+iM08= -github.com/pingcap/parser v0.0.0-20200731074050-60bb73054ef8/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= +github.com/pingcap/parser v0.0.0-20200803072748-fdf66528323d h1:QQMAWm/b/8EyCrqqcjdO4DcACS06tx8IhKGWC4PTqiQ= +github.com/pingcap/parser v0.0.0-20200803072748-fdf66528323d/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 h1:FM+PzdoR3fmWAJx3ug+p5aOgs5aZYwFkoDL7Potdsz0= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA= diff --git a/server/http_handler.go b/server/http_handler.go index 66793a6d04005..cfeb02a48f586 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -717,6 +717,7 @@ type tableFlashReplicaInfo struct { ReplicaCount uint64 `json:"replica_count"` LocationLabels []string `json:"location_labels"` Available bool `json:"available"` + HighPriority bool `json:"high_priority"` } func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -759,6 +760,15 @@ func (h flashReplicaHandler) getTiFlashReplicaInfo(tblInfo *model.TableInfo, rep Available: tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), }) } + for _, p := range pi.AddingDefinitions { + replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{ + ID: p.ID, + ReplicaCount: tblInfo.TiFlashReplica.Count, + LocationLabels: tblInfo.TiFlashReplica.LocationLabels, + Available: tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), + HighPriority: true, + }) + } return replicaInfos } replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{ diff --git a/util/admin/admin.go b/util/admin/admin.go index 80af163ad9de6..84e48f2f19394 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -105,8 +105,10 @@ func IsJobRollbackable(job *model.Job) bool { job.SchemaState == model.StateDeleteOnly { return false } + case model.ActionAddTablePartition: + return job.SchemaState == model.StateNone || job.SchemaState == model.StateReplicaOnly case model.ActionDropColumn, model.ActionModifyColumn, - model.ActionDropTablePartition, model.ActionAddTablePartition, + model.ActionDropTablePartition, model.ActionRebaseAutoID, model.ActionShardRowID, model.ActionTruncateTable, model.ActionAddForeignKey, model.ActionDropForeignKey, model.ActionRenameTable,