Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add addingDefinition field for adding partition replica check (#18495) #18865

Merged
merged 7 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,12 +830,15 @@ func (s *testStateChangeSuite) TestParallelAddPrimaryKey(c *C) {
func (s *testStateChangeSuite) TestParallelAlterAddPartition(c *C) {
sql1 := `alter table t_part add partition (
partition p2 values less than (30)
);`
sql2 := `alter table t_part add partition (
partition p3 values less than (30)
);`
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:1493]VALUES LESS THAN value must be strictly increasing for each partition")
}
s.testControlParallelExecSQL(c, sql1, sql1, f)
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelDropColumn(c *C) {
Expand Down
2 changes: 2 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,8 @@ func (s *testDBSuite1) TestCancelAddTableAndDropTablePartition(c *C) {
}{
{model.ActionAddTablePartition, model.JobStateNone, model.StateNone, true},
{model.ActionDropTablePartition, model.JobStateNone, model.StateNone, true},
// Add table partition now can be cancelled in ReplicaOnly state.
{model.ActionAddTablePartition, model.JobStateRunning, model.StateReplicaOnly, true},
{model.ActionAddTablePartition, model.JobStateRunning, model.StatePublic, false},
{model.ActionDropTablePartition, model.JobStateRunning, model.StatePublic, false},
}
Expand Down
31 changes: 31 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddPrimaryKey, jobIDs: []int64{firstID + 35}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 35)}, cancelState: model.StatePublic},
{act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 36}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly},
{act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 37}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 37)}, cancelState: model.StateDeleteOnly},

{act: model.ActionAddTablePartition, jobIDs: []int64{firstID + 42}, cancelRetErrs: noErrs, cancelState: model.StateNone},
{act: model.ActionAddTablePartition, jobIDs: []int64{firstID + 43}, cancelRetErrs: noErrs, cancelState: model.StateReplicaOnly},
{act: model.ActionAddTablePartition, jobIDs: []int64{firstID + 44}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob}, cancelState: model.StatePublic},
}

return tests
Expand Down Expand Up @@ -862,6 +866,33 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, idxOrigName)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkDropIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// Cancel add table partition.
baseTableInfo := testTableInfoWithPartitionLessThan(c, d, "empty_table", 5, "1000")
testCreateTable(c, ctx, d, dbInfo, baseTableInfo)

cancelState = model.StateNone
updateTest(&tests[34])
addedPartInfo := testAddedNewTablePartitionInfo(c, d, baseTableInfo, "p1", "maxvalue")
addPartitionArgs := []interface{}{addedPartInfo}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, baseTableInfo.ID, test.act, addPartitionArgs, &cancelState)
c.Check(checkErr, IsNil)
baseTable := testGetTable(c, d, dbInfo.ID, baseTableInfo.ID)
c.Assert(len(baseTable.Meta().Partition.Definitions), Equals, 1)

updateTest(&tests[35])
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, baseTableInfo.ID, test.act, addPartitionArgs, &cancelState)
c.Check(checkErr, IsNil)
baseTable = testGetTable(c, d, dbInfo.ID, baseTableInfo.ID)
c.Assert(len(baseTable.Meta().Partition.Definitions), Equals, 1)

updateTest(&tests[36])
doDDLJobSuccess(ctx, d, c, dbInfo.ID, baseTableInfo.ID, test.act, addPartitionArgs)
c.Check(checkErr, IsNil)
baseTable = testGetTable(c, d, dbInfo.ID, baseTableInfo.ID)
c.Assert(len(baseTable.Meta().Partition.Definitions), Equals, 2)
c.Assert(baseTable.Meta().Partition.Definitions[1].ID, Equals, addedPartInfo.Definitions[0].ID)
c.Assert(baseTable.Meta().Partition.Definitions[1].LessThan[0], Equals, addedPartInfo.Definitions[0].LessThan[0])
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
Empty file modified ddl/index.go
100755 → 100644
Empty file.
256 changes: 245 additions & 11 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ package ddl

import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/format"
Expand All @@ -31,15 +34,235 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

const (
partitionMaxValue = "MAXVALUE"
)

func checkAddPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.PartitionInfo, []model.PartitionDefinition, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
partInfo := &model.PartitionInfo{}
err = job.DecodeArgs(&partInfo)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, errors.Trace(err)
}
if len(tblInfo.Partition.AddingDefinitions) > 0 {
return tblInfo, partInfo, tblInfo.Partition.AddingDefinitions, nil
}
return tblInfo, partInfo, []model.PartitionDefinition{}, nil
}

func onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
ver, err := onDropTablePartition(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

tblInfo, partInfo, addingDefinitions, err := checkAddPartition(t, job)
if err != nil {
return ver, err
}

// In order to skip maintaining the state check in partitionDefinition, TiDB use addingDefinition instead of state field.
// So here using `job.SchemaState` to judge what the stage of this job is.
switch job.SchemaState {
case model.StateNone:
// job.SchemaState == model.StateNone means the job is in the initial state of add partition.
// Here should use partInfo from job directly and do some check action.
err = checkAddPartitionTooManyPartitions(uint64(len(tblInfo.Partition.Definitions) + len(partInfo.Definitions)))
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = checkAddPartitionValue(tblInfo, partInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = checkAddPartitionNameUnique(tblInfo, partInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// none -> replica only
job.SchemaState = model.StateReplicaOnly
// move the adding definition into tableInfo.
updateAddingPartitionInfo(partInfo, tblInfo)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
case model.StateReplicaOnly:
// replica only -> public
// Here need do some tiflash replica complement check.
// TODO: If a table is with no TiFlashReplica or it is not available, the replica-only state can be eliminated.
if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available {
// For available state, the new added partition should wait it's replica to
// be finished. Otherwise the query to this partition will be blocked.
needWait, err := checkPartitionReplica(addingDefinitions, d)
if err != nil {
ver, err = convertAddTablePartitionJob2RollbackJob(t, job, err, tblInfo)
return ver, err
}
if needWait {
// The new added partition hasn't been replicated.
// Do nothing to the job this time, wait next worker round.
time.Sleep(tiflashCheckTiDBHTTPAPIHalfInterval)
return ver, nil
}
}

// For normal and replica finished table, move the `addingDefinitions` into `Definitions`.
updatePartitionInfo(tblInfo)

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddTablePartition, TableInfo: tblInfo, PartInfo: partInfo})
default:
err = ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
}

return ver, errors.Trace(err)
}

// updatePartitionInfo merge `addingDefinitions` into `Definitions` in the tableInfo.
func updatePartitionInfo(tblInfo *model.TableInfo) {
parInfo := &model.PartitionInfo{}
oldDefs, newDefs := tblInfo.Partition.Definitions, tblInfo.Partition.AddingDefinitions
parInfo.Definitions = make([]model.PartitionDefinition, 0, len(newDefs)+len(oldDefs))
parInfo.Definitions = append(parInfo.Definitions, oldDefs...)
parInfo.Definitions = append(parInfo.Definitions, newDefs...)
tblInfo.Partition.Definitions = parInfo.Definitions
tblInfo.Partition.AddingDefinitions = nil
}

// updateAddingPartitionInfo write adding partitions into `addingDefinitions` field in the tableInfo.
func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *model.TableInfo) {
newDefs := partitionInfo.Definitions
tblInfo.Partition.AddingDefinitions = make([]model.PartitionDefinition, 0, len(newDefs))
tblInfo.Partition.AddingDefinitions = append(tblInfo.Partition.AddingDefinitions, newDefs...)
}

// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo.
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) []int64 {
physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions))
for _, one := range tblInfo.Partition.AddingDefinitions {
physicalTableIDs = append(physicalTableIDs, one.ID)
}
tblInfo.Partition.AddingDefinitions = nil
return physicalTableIDs
}

// checkAddPartitionValue values less than value must be strictly increasing for each partition.
func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) error {
if meta.Partition.Type == model.PartitionTypeRange && len(meta.Partition.Columns) == 0 {
newDefs, oldDefs := part.Definitions, meta.Partition.Definitions
rangeValue := oldDefs[len(oldDefs)-1].LessThan[0]
if strings.EqualFold(rangeValue, "MAXVALUE") {
return errors.Trace(ErrPartitionMaxvalue)
}

currentRangeValue, err := strconv.Atoi(rangeValue)
if err != nil {
return errors.Trace(err)
}

for i := 0; i < len(newDefs); i++ {
ifMaxvalue := strings.EqualFold(newDefs[i].LessThan[0], "MAXVALUE")
if ifMaxvalue && i == len(newDefs)-1 {
return nil
} else if ifMaxvalue && i != len(newDefs)-1 {
return errors.Trace(ErrPartitionMaxvalue)
}

nextRangeValue, err := strconv.Atoi(newDefs[i].LessThan[0])
if err != nil {
return errors.Trace(err)
}
if nextRangeValue <= currentRangeValue {
return errors.Trace(ErrRangeNotIncreasing)
}
currentRangeValue = nextRangeValue
}
}
return nil
}

func checkPartitionReplica(addingDefinitions []model.PartitionDefinition, d *ddlCtx) (needWait bool, err error) {
ctx := context.Background()
pdCli := d.store.(tikv.Storage).GetRegionCache().PDClient()
stores, err := pdCli.GetAllStores(ctx)
if err != nil {
return needWait, errors.Trace(err)
}
for _, pd := range addingDefinitions {
startKey, endKey := tablecodec.GetTableHandleKeyRange(pd.ID)
regions, _, err := pdCli.ScanRegions(ctx, startKey, endKey, -1)
if err != nil {
return needWait, errors.Trace(err)
}
// For every region in the partition, if it has some corresponding peers and
// no pending peers, that means the replication has completed.
for _, region := range regions {
regionState, err := pdCli.GetRegionByID(ctx, region.Id)
if err != nil {
return needWait, errors.Trace(err)
}
tiflashPeerAtLeastOne := checkTiFlashPeerStoreAtLeastOne(stores, regionState.Meta.Peers)
// It's unnecessary to wait all tiflash peer to be replicated.
// Here only make sure that tiflash peer count > 0 (at least one).
if tiflashPeerAtLeastOne {
continue
}
needWait = true
logutil.BgLogger().Info("[ddl] partition replicas check failed in replica-only DDL state", zap.Int64("pID", pd.ID), zap.Uint64("wait region ID", region.Id), zap.Bool("tiflash peer at least one", tiflashPeerAtLeastOne), zap.Time("check time", time.Now()))
return needWait, nil
}
}
logutil.BgLogger().Info("[ddl] partition replicas check ok in replica-only DDL state")
return needWait, nil
}

func checkTiFlashPeerStoreAtLeastOne(stores []*metapb.Store, peers []*metapb.Peer) bool {
for _, peer := range peers {
for _, store := range stores {
if peer.StoreId == store.Id && storeHasEngineTiFlashLabel(store) {
return true
}
}
}
return false
}

func storeHasEngineTiFlashLabel(store *metapb.Store) bool {
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
return true
}
}
return false
}

// buildTablePartitionInfo builds partition info and checks for some errors.
func buildTablePartitionInfo(ctx sessionctx.Context, s *ast.CreateTableStmt) (*model.PartitionInfo, error) {
if s.Partition == nil {
Expand Down Expand Up @@ -617,20 +840,31 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
err = checkDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
physicalTableIDs := removePartitionInfo(tblInfo, partNames)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
var physicalTableIDs []int64
if job.Type == model.ActionAddTablePartition {
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
physicalTableIDs = rollbackAddingPartitionInfo(tblInfo)
} else {
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
err = checkDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
physicalTableIDs = removePartitionInfo(tblInfo, partNames)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}

// A background job will be created to delete old partition data.
job.Args = []interface{}{physicalTableIDs}
return ver, nil
Expand Down
Empty file modified ddl/reorg.go
100755 → 100644
Empty file.
Loading