Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: REORGANIZE PARTITION (#38535) #41096

Merged
merged 42 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0d316ef
Merge master into feature branch (#38463)
mjonss Oct 13, 2022
2b8c979
*: merge master to feature/reorganize-partition (#38613)
mjonss Nov 15, 2022
aec9e7c
*: Merge master to feature/reorganize-partition (#39164)
mjonss Nov 28, 2022
a6dbd11
*: merge master into feature/reorganize-partition (#39556)
mjonss Dec 5, 2022
c047d1e
*: merge master into feature branch reorganize-partition (#39906)
mjonss Dec 13, 2022
fa2a2ba
*: merge master into feature/reorganize-partition dec22 (#40117)
mjonss Dec 22, 2022
25df00e
*: support REORGANIZE PARTITION, part 1 - data reorg / data+index cop…
mjonss Dec 30, 2022
b259112
*: Table partition double write during Reorganize partition (part 2) …
mjonss Dec 30, 2022
114b690
*: merge master into feature/reorganize-partition (#40263)
mjonss Jan 3, 2023
5ad0c09
*: Merge master into feature/reorganize-partition branch (#40480)
mjonss Jan 12, 2023
2887591
*: Reorganize Partition DDL states logic (#40473)
mjonss Jan 18, 2023
8db6a00
telemetry: Adding telemetry for alter table reorganize partition (#40…
mjonss Jan 18, 2023
3eef6a0
*: Reorg part delete range (#40543)
mjonss Jan 19, 2023
07a7465
*: Reorg part update table (#40714)
mjonss Jan 20, 2023
9d54157
*: Merge master into feature/reorganize-partition (#40770)
mjonss Jan 31, 2023
a330ec2
Squashed and rebased all reorganize-partition PRs on master
mjonss Jan 31, 2023
7a0ca62
Merge remote-tracking branch 'pingcap/feature/reorganize-partition' i…
mjonss Jan 31, 2023
8b7fe88
Post merge fix (ddl->callback pgk)
mjonss Jan 31, 2023
ffd4174
Merge remote-tracking branch 'pingcap/master' into reorg-part-merge-j…
mjonss Feb 6, 2023
7496ed1
Post merge fixes.
mjonss Feb 6, 2023
6c2edfc
Fixed linting
mjonss Feb 6, 2023
c91ac46
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 6, 2023
52fb67c
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 6, 2023
2173f4b
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 6, 2023
7934fdc
Updated TODO with errors, according to review comments.
mjonss Feb 7, 2023
cd60c2d
Linting + updated comment
mjonss Feb 7, 2023
6ce1d7d
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 7, 2023
705cd3d
Updated comments
mjonss Feb 7, 2023
dd29dc7
Added test for removed partition data
mjonss Feb 7, 2023
6ceedc9
Moved reorg partition tests to separate file
mjonss Feb 7, 2023
2eb44ce
Test include fix
mjonss Feb 7, 2023
fce6f6a
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 8, 2023
efa190b
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 8, 2023
39e6125
Merge branch 'master' into feat-branch-reorg-part-master-merge
mjonss Feb 8, 2023
ae0e1a7
Merge remote-tracking branch 'pingcap/master' into feat-branch-reorg-…
mjonss Feb 8, 2023
f081d66
Merge branch 'master' into feat-branch-reorg-part-master-merge
hawkingrei Feb 9, 2023
f19ad52
Merge branch 'master' into feat-branch-reorg-part-master-merge
hawkingrei Feb 9, 2023
ce412f1
Merge branch 'master' into feat-branch-reorg-part-master-merge
ti-chi-bot Feb 9, 2023
e2d7bda
Merge branch 'master' into feat-branch-reorg-part-master-merge
ti-chi-bot Feb 9, 2023
618ca02
Merge remote-tracking branch 'pingcap/master' into feat-branch-reorg-…
mjonss Feb 10, 2023
a608929
Merge remote-tracking branch 'pingcap/master' into feat-branch-reorg-…
mjonss Feb 10, 2023
6148f3e
Merge remote-tracking branch 'pingcap/master' into feat-branch-reorg-…
mjonss Feb 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ go_test(
"//util/domainutil",
"//util/gcutil",
"//util/logutil",
"//util/mathutil",
"//util/mock",
"//util/sem",
"//util/sqlexec",
Expand Down
20 changes: 14 additions & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3
typeReorgPartitionWorker backfillerType = 4

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
Expand All @@ -82,6 +83,8 @@ func (bT backfillerType) String() string {
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
case typeReorgPartitionWorker:
return "reorganize partition"
default:
return "unknown"
}
Expand Down Expand Up @@ -143,6 +146,7 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
// 1: add-index
// 2: modify-column-type
// 3: clean-up global index
// 4: reorganize partition
//
// They all have a write reorganization state to back fill data into the rows existed.
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
Expand Down Expand Up @@ -652,7 +656,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return errors.Trace(err)
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
dc.getReorgCtx(reorgInfo.Job.ID).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
Expand All @@ -666,7 +669,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
func getBatchTasks(t table.PhysicalTable, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
physicalTableID := reorgInfo.PhysicalTableID
var prefix kv.Key
Expand All @@ -677,8 +680,6 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}
// Build reorg tasks.
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID)
for i, keyRange := range kvRanges {
startKey := keyRange.StartKey
Expand All @@ -702,7 +703,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
id: i,
jobID: reorgInfo.Job.ID,
physicalTableID: physicalTableID,
physicalTable: phyTbl,
physicalTable: t,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
Expand All @@ -718,7 +719,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}

// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.PhysicalTable,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize)
if len(batchTasks) == 0 {
Expand Down Expand Up @@ -925,6 +926,13 @@ func (b *backfillScheduler) adjustWorkerSize() error {
idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(jc.ddlJobCtx, partWorker)
worker = partWorker
default:
return errors.New("unknown backfill type")
}
Expand Down
43 changes: 38 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,37 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf
return elements
}

func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo)
if tbl, ok := t.(table.PartitionedTable); ok {
done := false
for !done {
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
workType := typeReorgPartitionWorker
if reorgInfo.Job.Type != model.ActionReorganizePartition {
// workType = typeUpdateColumnWorker
// TODO: Support Modify Column on partitioned table
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
done, err = w.updateReorgInfo(tbl, reorgInfo)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
Expand Down Expand Up @@ -1085,6 +1113,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
}
}

if _, ok := t.(table.PartitionedTable); ok {
// TODO: remove when modify column of partitioned table is supported
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
// Get the original start handle and end handle.
currentVer, err := getValidCurrentVersion(reorgInfo.d.store)
if err != nil {
Expand Down Expand Up @@ -1212,8 +1245,8 @@ type rowRecord struct {
warning *terror.Error // It's used to record the cast warning of a record.
}

// getNextKey gets next handle of entry that we are going to process.
func (*updateColumnWorker) getNextKey(taskRange reorgBackfillTask,
// getNextHandleKey gets next handle of entry that we are going to process.
func getNextHandleKey(taskRange reorgBackfillTask,
taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) {
if !taskDone {
// The task is not done. So we need to pick the last processed entry's handle and add one.
Expand Down Expand Up @@ -1263,7 +1296,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}

logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.rowRecords, w.getNextKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
}

func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
Expand Down
4 changes: 2 additions & 2 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,15 @@ func TestTransactionWithWriteOnlyColumn(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null", done)
err := <-done
require.NoError(t, err)
require.NoError(t, checkErr)
tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
tk.MustExec("delete from t1")

// test transaction on drop column.
go backgroundExec(store, "alter table t1 drop column c", done)
go backgroundExec(store, "test", "alter table t1 drop column c", done)
err = <-done
require.NoError(t, err)
require.NoError(t, checkErr)
Expand Down
79 changes: 77 additions & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,14 +1215,14 @@ func TestBitDefaultValue(t *testing.T) {
);`)
}

func backgroundExec(s kv.Storage, sql string, done chan error) {
func backgroundExec(s kv.Storage, schema, sql string, done chan error) {
se, err := session.CreateSession4Test(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute(context.Background(), "use test")
_, err = se.Execute(context.Background(), "use "+schema)
if err != nil {
done <- errors.Trace(err)
return
Expand Down Expand Up @@ -4311,3 +4311,78 @@ func TestRegexpFunctionsGeneratedColumn(t *testing.T) {

tk.MustExec("drop table if exists reg_like")
}

func TestReorgPartitionRangeFailure(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`create schema reorgfail`)
tk.MustExec("use reorgfail")

tk.MustExec("CREATE TABLE t (id int, d varchar(255)) partition by range (id) (partition p0 values less than (1000000), partition p1 values less than (2000000), partition p2 values less than (3000000))")
tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (1000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (4000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
}

func TestReorgPartitionDocs(t *testing.T) {
// To test what is added as partition management in the docs
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`create schema reorgdocs`)
tk.MustExec("use reorgdocs")
tk.MustExec(`CREATE TABLE members (
id int,
fname varchar(255),
lname varchar(255),
dob date,
data json
)
PARTITION BY RANGE (YEAR(dob)) (
PARTITION pBefore1950 VALUES LESS THAN (1950),
PARTITION p1950 VALUES LESS THAN (1960),
PARTITION p1960 VALUES LESS THAN (1970),
PARTITION p1970 VALUES LESS THAN (1980),
PARTITION p1980 VALUES LESS THAN (1990),
PARTITION p1990 VALUES LESS THAN (2000))`)
tk.MustExec(`CREATE TABLE member_level (
id int,
level int,
achievements json
)
PARTITION BY LIST (level) (
PARTITION l1 VALUES IN (1),
PARTITION l2 VALUES IN (2),
PARTITION l3 VALUES IN (3),
PARTITION l4 VALUES IN (4),
PARTITION l5 VALUES IN (5));`)
tk.MustExec(`ALTER TABLE members DROP PARTITION p1990`)
tk.MustExec(`ALTER TABLE member_level DROP PARTITION l5`)
tk.MustExec(`ALTER TABLE members TRUNCATE PARTITION p1980`)
tk.MustExec(`ALTER TABLE member_level TRUNCATE PARTITION l4`)
tk.MustExec("ALTER TABLE members ADD PARTITION (PARTITION `p1990to2010` VALUES LESS THAN (2010))")
tk.MustExec(`ALTER TABLE member_level ADD PARTITION (PARTITION l5_6 VALUES IN (5,6))`)
tk.MustContainErrMsg(`ALTER TABLE members ADD PARTITION (PARTITION p1990 VALUES LESS THAN (2000))`, "[ddl:1493]VALUES LESS THAN value must be strictly increasing for each partition")
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p1990to2010 INTO
(PARTITION p1990 VALUES LESS THAN (2000),
PARTITION p2000 VALUES LESS THAN (2010),
PARTITION p2010 VALUES LESS THAN (2020),
PARTITION p2020 VALUES LESS THAN (2030),
PARTITION pMax VALUES LESS THAN (MAXVALUE))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l5_6 INTO
(PARTITION l5 VALUES IN (5),
PARTITION l6 VALUES IN (6))`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1950,p1950 INTO (PARTITION pBefore1960 VALUES LESS THAN (1960))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1,l2 INTO (PARTITION l1_2 VALUES IN (1,2))`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1960,p1960,p1970,p1980,p1990,p2000,p2010,p2020,pMax INTO
(PARTITION p1800 VALUES LESS THAN (1900),
PARTITION p1900 VALUES LESS THAN (2000),
PARTITION p2000 VALUES LESS THAN (2100))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1_2,l3,l4,l5,l6 INTO
(PARTITION lOdd VALUES IN (1,3,5),
PARTITION lEven VALUES IN (2,4,6))`)
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p1800,p2000 INTO (PARTITION p2000 VALUES LESS THAN (2100))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustExec(`INSERT INTO members VALUES (313, "John", "Doe", "2022-11-22", NULL)`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2050))`)
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2020))`, "[table:1526]Table has no partition for value 2022")
tk.MustExec(`INSERT INTO member_level (id, level) values (313, 6)`)
tk.MustContainErrMsg(`ALTER TABLE member_level REORGANIZE PARTITION lEven INTO (PARTITION lEven VALUES IN (2,4))`, "[table:1526]Table has no partition for value 6")
}
Loading