Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46126
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
mjonss authored and ti-chi-bot committed Aug 17, 2023
1 parent 89f8269 commit f574f79
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 0 deletions.
37 changes: 37 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,7 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
<<<<<<< HEAD
var (
ptSchemaID int64
ptTableID int64
Expand All @@ -1565,6 +1566,42 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
=======
// From start of function: diff.SchemaID = job.SchemaID
// Old is original non partitioned table
diff.OldTableID = job.TableID
diff.OldSchemaID = job.SchemaID
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
// This is needed for not crashing TiFlash!
// TODO: Update TiFlash, to handle StateWriteOnly
diff.AffectedOpts = []*model.AffectedOption{{
TableID: ptTableID,
}}
if job.SchemaState != model.StatePublic {
// No change, just to refresh the non-partitioned table
// with its new ExchangePartitionInfo.
diff.TableID = job.TableID
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
} else {
// Swap
diff.TableID = ptDefID
// Also add correct SchemaID in case different schemas
diff.AffectedOpts[0].SchemaID = ptSchemaID
>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126))
}
diff.OldTableID = job.TableID
affects := make([]*model.AffectedOption, 1)
Expand Down
49 changes: 49 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,43 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
if err != nil {
return ver, errors.Trace(err)
}
<<<<<<< HEAD
if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag {
=======
if job.SchemaState == model.StateNone {
if pt.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State)
}
err = checkExchangePartition(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = checkTableDefCompatible(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if defID != partDef.ID {
logutil.BgLogger().Info("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"),
zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
job.Args[0] = partDef.ID
defID = partDef.ID
err = updateDDLJob2Table(w.sess, job, true)
if err != nil {
return ver, errors.Trace(err)
}
}
>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126))
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionFlag: true,
ExchangePartitionID: ptID,
Expand All @@ -2077,6 +2113,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
delayForAsyncCommit()
}

if defID != partDef.ID {
// Should never happen, should have been updated above, in previous state!
logutil.BgLogger().Error("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"),
zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
job.Args[0] = partDef.ID
defID = partDef.ID
err = updateDDLJob2Table(w.sess, job, true)
if err != nil {
return ver, errors.Trace(err)
}
}

if withValidation {
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name)
if err != nil {
Expand Down Expand Up @@ -2206,6 +2254,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
ntr := rules[ntrID]
ptr := rules[ptrID]

// This must be a bug, nt cannot be partitioned!
partIDs := getPartitionIDs(nt)

var setRules []*label.Rule
Expand Down
80 changes: 80 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,81 @@ func (b *Builder) applyDropTableOrParition(m *meta.Meta, diff *model.SchemaDiff)
return tblIDs, nil
}

<<<<<<< HEAD
=======
func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
return nil, errors.Trace(err)
}
for _, opt := range diff.AffectedOpts {
if opt.OldTableID != 0 {
b.deleteBundle(b.is, opt.OldTableID)
}
if opt.TableID != 0 {
b.markTableBundleShouldUpdate(opt.TableID)
}
}
return tblIDs, nil
}

func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
// The partitioned table is not affected until the last stage
if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID {
return b.applyTableUpdate(m, diff)
}
ntSchemaID := diff.OldSchemaID
ntID := diff.OldTableID
ptSchemaID := diff.SchemaID
ptID := diff.TableID
partID := diff.TableID
if len(diff.AffectedOpts) > 0 {
ptID = diff.AffectedOpts[0].TableID
if diff.AffectedOpts[0].SchemaID != 0 {
ptSchemaID = diff.AffectedOpts[0].SchemaID
}
}
// The normal table needs to be updated first:
// Just update the tables separately
currDiff := &model.SchemaDiff{
// This is only for the case since https://github.com/pingcap/tidb/pull/45877
// Fixed now, by adding back the AffectedOpts
// to carry the partitioned Table ID.
Type: diff.Type,
Version: diff.Version,
TableID: ntID,
SchemaID: ntSchemaID,
}
if ptID != partID {
currDiff.TableID = partID
currDiff.OldTableID = ntID
currDiff.OldSchemaID = ntSchemaID
}
ntIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
// partID is the new id for the non-partitioned table!
b.markTableBundleShouldUpdate(partID)
// Then the partitioned table, will re-read the whole table, including all partitions!
currDiff.TableID = ptID
currDiff.SchemaID = ptSchemaID
currDiff.OldTableID = ptID
currDiff.OldSchemaID = ptSchemaID
ptIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
// ntID is the new id for the partition!
b.markPartitionBundleShouldUpdate(ntID)
err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID)
if err != nil {
return nil, errors.Trace(err)
}
return append(ptIDs, ntIDs...), nil
}

>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126))
func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
tblIDs, err := b.applyTableUpdate(m, diff)
if err != nil {
Expand Down Expand Up @@ -391,7 +466,12 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
newTableID = diff.TableID
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
oldTableID = diff.TableID
<<<<<<< HEAD
case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition:
=======
case model.ActionTruncateTable, model.ActionCreateView,
model.ActionExchangeTablePartition:
>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126))
oldTableID = diff.OldTableID
newTableID = diff.TableID
default:
Expand Down

0 comments on commit f574f79

Please sign in to comment.