From b25911282275265e04e6b3c4393671c366de343b Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 30 Dec 2022 11:16:18 +0000 Subject: [PATCH] *: Table partition double write during Reorganize partition (part 2) | tidb-test=pr/2044 (#38508) ref pingcap/tidb#15000, ref pingcap/tidb#38535 --- ddl/db_partition_test.go | 56 ++++ executor/builder.go | 12 +- planner/core/point_get_plan.go | 12 +- planner/core/rule_partition_processor.go | 13 +- session/bootstrap.go | 9 +- table/tables/partition.go | 369 ++++++++++++++++------- table/tables/partition_test.go | 5 +- 7 files changed, 334 insertions(+), 142 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index ace01bab252f3..8b2ea57a4ccdb 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4554,3 +4554,59 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) { tk.MustExec(`create table t (a int, b char) partition by hash (a) partitions 3`) tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'") } + +func TestReorgPartitionConcurrent(t *testing.T) { + t.Skip("Needs PR 38460 as well") + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "ReorgPartConcurrent" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition pMax values less than (MAXVALUE))`) + tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + dom := domain.GetDomain(tk.Session()) + originHook := dom.DDL().GetHook() + defer dom.DDL().SetHook(originHook) + hook := &ddl.TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + + wait := make(chan bool) + defer close(wait) + + injected := false + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization && !injected { + injected = true + <-wait + <-wait + } + } + alterErr := make(chan error, 1) + go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) + wait <- true + tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`) + wait <- true + require.NoError(t, <-alterErr) + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "12 12 21", + "14 14 14", + "15 15 15")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) +} diff --git a/executor/builder.go b/executor/builder.go index d4270397eecd0..f60a7a78f5a52 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3527,10 +3527,14 @@ func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []i keyColOffsets[i] = offset } - pe, err := pt.(interface { - PartitionExpr() (*tables.PartitionExpr, error) - }).PartitionExpr() - if err != nil { + t, ok := pt.(interface { + PartitionExpr() *tables.PartitionExpr + }) + if !ok { + return nil + } + pe := t.PartitionExpr() + if pe == nil { return nil } diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index f0dff0a18a9f8..cd9315bdaee77 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -1898,12 +1898,7 @@ func getPartitionExpr(ctx sessionctx.Context, tbl *model.TableInfo) *tables.Part } // PartitionExpr don't need columns and names for hash partition. - partitionExpr, err := partTable.PartitionExpr() - if err != nil { - return nil - } - - return partitionExpr + return partTable.PartitionExpr() } func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *ast.ColumnName { @@ -1920,10 +1915,7 @@ func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *a return nil } // PartitionExpr don't need columns and names for hash partition. - partitionExpr, err := table.(partitionTable).PartitionExpr() - if err != nil { - return nil - } + partitionExpr := table.(partitionTable).PartitionExpr() expr := partitionExpr.OrigExpr col, ok := expr.(*ast.ColumnNameExpr) if !ok { diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 81a43935b056f..5982bcb32d6ba 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -110,7 +110,7 @@ func (s *partitionProcessor) rewriteDataSource(lp LogicalPlan, opt *logicalOptim // partitionTable is for those tables which implement partition. type partitionTable interface { - PartitionExpr() (*tables.PartitionExpr, error) + PartitionExpr() *tables.PartitionExpr } func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, columns []*expression.Column, names types.NameSlice) (expression.Expression, error) { @@ -595,13 +595,11 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi func (s *partitionProcessor) findUsedListPartitions(ctx sessionctx.Context, tbl table.Table, partitionNames []model.CIStr, conds []expression.Expression) ([]int, error) { pi := tbl.Meta().Partition - partExpr, err := tbl.(partitionTable).PartitionExpr() - if err != nil { - return nil, err - } + partExpr := tbl.(partitionTable).PartitionExpr() listPruner := newListPartitionPruner(ctx, tbl, partitionNames, s, conds, partExpr.ForListPruning) var used map[int]struct{} + var err error if partExpr.ForListPruning.ColPrunes == nil { used, err = listPruner.findUsedListPartitions(conds) } else { @@ -826,10 +824,7 @@ func intersectionRange(start, end, newStart, newEnd int) (int, int) { func (s *partitionProcessor) pruneRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, tbl table.PartitionedTable, conds []expression.Expression, columns []*expression.Column, names types.NameSlice) (partitionRangeOR, error) { - partExpr, err := tbl.(partitionTable).PartitionExpr() - if err != nil { - return nil, err - } + partExpr := tbl.(partitionTable).PartitionExpr() // Partition by range columns. if len(pi.Columns) > 0 { diff --git a/session/bootstrap.go b/session/bootstrap.go index 6644da210f4c7..759855cc5c880 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -2443,7 +2443,7 @@ func oldPasswordUpgrade(pass string) (string, error) { // rebuildAllPartitionValueMapAndSorted rebuilds all value map and sorted info for list column partitions with InfoSchema. func rebuildAllPartitionValueMapAndSorted(s *session) { type partitionExpr interface { - PartitionExpr() (*tables.PartitionExpr, error) + PartitionExpr() *tables.PartitionExpr } p := parser.New() @@ -2455,12 +2455,9 @@ func rebuildAllPartitionValueMapAndSorted(s *session) { continue } - pe, err := t.(partitionExpr).PartitionExpr() - if err != nil { - panic("partition table gets partition expression failed") - } + pe := t.(partitionExpr).PartitionExpr() for _, cp := range pe.ColPrunes { - if err = cp.RebuildPartitionValueMapAndSorted(p); err != nil { + if err := cp.RebuildPartitionValueMapAndSorted(p, pi.Definitions); err != nil { logutil.BgLogger().Warn("build list column partition value map and sorted failed") break } diff --git a/table/tables/partition.go b/table/tables/partition.go index 34dd2ca20af11..2c8112053cf9e 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -89,11 +89,18 @@ type partitionedTable struct { partitions map[int64]*partition evalBufferTypes []*types.FieldType evalBufferPool sync.Pool + // Only used during Reorganize partition + reorgPartitions map[int64]interface{} + reorgPartitionExpr *PartitionExpr } func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.PartitionedTable, error) { + pi := tblInfo.GetPartitionInfo() + if pi == nil || len(pi.Definitions) == 0 { + return nil, table.ErrUnknownPartition + } ret := &partitionedTable{TableCommon: *tbl} - partitionExpr, err := newPartitionExpr(tblInfo) + partitionExpr, err := newPartitionExpr(tblInfo, pi.Definitions) if err != nil { return nil, errors.Trace(err) } @@ -107,10 +114,6 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if err := initTableIndices(&ret.TableCommon); err != nil { return nil, errors.Trace(err) } - pi := tblInfo.GetPartitionInfo() - if len(pi.Definitions) == 0 { - return nil, table.ErrUnknownPartition - } partitions := make(map[int64]*partition, len(pi.Definitions)) for _, p := range pi.Definitions { var t partition @@ -122,10 +125,20 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part partitions[p.ID] = &t } ret.partitions = partitions + if len(pi.DroppingDefinitions) > 0 && len(pi.AddingDefinitions) > 0 { + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.AddingDefinitions) + if err != nil { + return nil, errors.Trace(err) + } + ret.reorgPartitions = make(map[int64]interface{}, len(pi.DroppingDefinitions)) + for _, def := range pi.DroppingDefinitions { + ret.reorgPartitions[def.ID] = nil + } + } return ret, nil } -func newPartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error) { +func newPartitionExpr(tblInfo *model.TableInfo, defs []model.PartitionDefinition) (*PartitionExpr, error) { // a partitioned table cannot rely on session context/sql modes, so use a default one! ctx := mock.NewContext() dbName := model.NewCIStr(ctx.GetSessionVars().CurrentDB) @@ -136,11 +149,11 @@ func newPartitionExpr(tblInfo *model.TableInfo) (*PartitionExpr, error) { pi := tblInfo.GetPartitionInfo() switch pi.Type { case model.PartitionTypeRange: - return generateRangePartitionExpr(ctx, pi, columns, names) + return generateRangePartitionExpr(ctx, pi, defs, columns, names) case model.PartitionTypeHash: return generateHashPartitionExpr(ctx, pi, columns, names) case model.PartitionTypeList: - return generateListPartitionExpr(ctx, tblInfo, columns, names) + return generateListPartitionExpr(ctx, tblInfo, defs, columns, names) } panic("cannot reach here") } @@ -159,8 +172,6 @@ type PartitionExpr struct { *ForRangeColumnsPruning // ColOffset is the offsets of partition columns. ColumnOffset []int - // InValues: x in (1,2); x in (3,4); x in (5,6), used for list partition. - InValues []expression.Expression *ForListPruning } @@ -193,19 +204,19 @@ type ForRangeColumnsPruning struct { LessThan [][]*expression.Expression } -func dataForRangeColumnsPruning(ctx sessionctx.Context, pi *model.PartitionInfo, schema *expression.Schema, names []*types.FieldName, p *parser.Parser) (*ForRangeColumnsPruning, error) { +func dataForRangeColumnsPruning(ctx sessionctx.Context, defs []model.PartitionDefinition, schema *expression.Schema, names []*types.FieldName, p *parser.Parser) (*ForRangeColumnsPruning, error) { var res ForRangeColumnsPruning - res.LessThan = make([][]*expression.Expression, 0, len(pi.Definitions)) - for i := 0; i < len(pi.Definitions); i++ { - lessThanCols := make([]*expression.Expression, 0, len(pi.Columns)) - for j := range pi.Definitions[i].LessThan { - if strings.EqualFold(pi.Definitions[i].LessThan[j], "MAXVALUE") { + res.LessThan = make([][]*expression.Expression, 0, len(defs)) + for i := 0; i < len(defs); i++ { + lessThanCols := make([]*expression.Expression, 0, len(defs[i].LessThan)) + for j := range defs[i].LessThan { + if strings.EqualFold(defs[i].LessThan[j], "MAXVALUE") { // Use a nil pointer instead of math.MaxInt64 to avoid the corner cases. lessThanCols = append(lessThanCols, nil) // No column after MAXVALUE matters break } - tmp, err := parseSimpleExprWithNames(p, ctx, pi.Definitions[i].LessThan[j], schema, names) + tmp, err := parseSimpleExprWithNames(p, ctx, defs[i].LessThan[j], schema, names) if err != nil { return nil, err } @@ -437,29 +448,29 @@ type ForRangePruning struct { Unsigned bool } -// dataForRangePruning extracts the less than parts from 'partition p0 less than xx ... partitoin p1 less than ...' -func dataForRangePruning(sctx sessionctx.Context, pi *model.PartitionInfo) (*ForRangePruning, error) { +// dataForRangePruning extracts the less than parts from 'partition p0 less than xx ... partition p1 less than ...' +func dataForRangePruning(sctx sessionctx.Context, defs []model.PartitionDefinition) (*ForRangePruning, error) { var maxValue bool var unsigned bool - lessThan := make([]int64, len(pi.Definitions)) - for i := 0; i < len(pi.Definitions); i++ { - if strings.EqualFold(pi.Definitions[i].LessThan[0], "MAXVALUE") { + lessThan := make([]int64, len(defs)) + for i := 0; i < len(defs); i++ { + if strings.EqualFold(defs[i].LessThan[0], "MAXVALUE") { // Use a bool flag instead of math.MaxInt64 to avoid the corner cases. maxValue = true } else { var err error - lessThan[i], err = strconv.ParseInt(pi.Definitions[i].LessThan[0], 10, 64) + lessThan[i], err = strconv.ParseInt(defs[i].LessThan[0], 10, 64) var numErr *strconv.NumError if stderr.As(err, &numErr) && numErr.Err == strconv.ErrRange { var tmp uint64 - tmp, err = strconv.ParseUint(pi.Definitions[i].LessThan[0], 10, 64) + tmp, err = strconv.ParseUint(defs[i].LessThan[0], 10, 64) lessThan[i] = int64(tmp) unsigned = true } if err != nil { - val, ok := fixOldVersionPartitionInfo(sctx, pi.Definitions[i].LessThan[0]) + val, ok := fixOldVersionPartitionInfo(sctx, defs[i].LessThan[0]) if !ok { - logutil.BgLogger().Error("wrong partition definition", zap.String("less than", pi.Definitions[i].LessThan[0])) + logutil.BgLogger().Error("wrong partition definition", zap.String("less than", defs[i].LessThan[0])) return nil, errors.WithStack(err) } lessThan[i] = val @@ -501,40 +512,14 @@ func rangePartitionExprStrings(pi *model.PartitionInfo) []string { } func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, - columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { + defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. - locateExprs := make([]expression.Expression, 0, len(pi.Definitions)) - var buf bytes.Buffer p := parser.New() schema := expression.NewSchema(columns...) partStrs := rangePartitionExprStrings(pi) - for i := 0; i < len(pi.Definitions); i++ { - if strings.EqualFold(pi.Definitions[i].LessThan[0], "MAXVALUE") { - // Expr less than maxvalue is always true. - fmt.Fprintf(&buf, "true") - } else { - maxValueFound := false - for j := range partStrs[1:] { - if strings.EqualFold(pi.Definitions[i].LessThan[j+1], "MAXVALUE") { - // if any column will be less than MAXVALUE, so change < to <= of the previous prefix of columns - fmt.Fprintf(&buf, "((%s) <= (%s))", strings.Join(partStrs[:j+1], ","), strings.Join(pi.Definitions[i].LessThan[:j+1], ",")) - maxValueFound = true - break - } - } - if !maxValueFound { - fmt.Fprintf(&buf, "((%s) < (%s))", strings.Join(partStrs, ","), strings.Join(pi.Definitions[i].LessThan, ",")) - } - } - - expr, err := parseSimpleExprWithNames(p, ctx, buf.String(), schema, names) - if err != nil { - // If it got an error here, ddl may hang forever, so this error log is important. - logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err)) - return nil, errors.Trace(err) - } - locateExprs = append(locateExprs, expr) - buf.Reset() + locateExprs, err := getRangeLocateExprs(ctx, p, defs, partStrs, schema, names) + if err != nil { + return nil, errors.Trace(err) } ret := &PartitionExpr{ UpperBounds: locateExprs, @@ -547,14 +532,14 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, ret.ColumnOffset = offset if len(pi.Columns) < 1 { - tmp, err := dataForRangePruning(ctx, pi) + tmp, err := dataForRangePruning(ctx, defs) if err != nil { return nil, errors.Trace(err) } ret.Expr = partExpr ret.ForRangePruning = tmp } else { - tmp, err := dataForRangeColumnsPruning(ctx, pi, schema, names, p) + tmp, err := dataForRangeColumnsPruning(ctx, defs, schema, names, p) if err != nil { return nil, errors.Trace(err) } @@ -563,6 +548,40 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, return ret, nil } +func getRangeLocateExprs(ctx sessionctx.Context, p *parser.Parser, defs []model.PartitionDefinition, partStrs []string, schema *expression.Schema, names types.NameSlice) ([]expression.Expression, error) { + var buf bytes.Buffer + locateExprs := make([]expression.Expression, 0, len(defs)) + for i := 0; i < len(defs); i++ { + if strings.EqualFold(defs[i].LessThan[0], "MAXVALUE") { + // Expr less than maxvalue is always true. + fmt.Fprintf(&buf, "true") + } else { + maxValueFound := false + for j := range partStrs[1:] { + if strings.EqualFold(defs[i].LessThan[j+1], "MAXVALUE") { + // if any column will be less than MAXVALUE, so change < to <= of the previous prefix of columns + fmt.Fprintf(&buf, "((%s) <= (%s))", strings.Join(partStrs[:j+1], ","), strings.Join(defs[i].LessThan[:j+1], ",")) + maxValueFound = true + break + } + } + if !maxValueFound { + fmt.Fprintf(&buf, "((%s) < (%s))", strings.Join(partStrs, ","), strings.Join(defs[i].LessThan, ",")) + } + } + + expr, err := parseSimpleExprWithNames(p, ctx, buf.String(), schema, names) + if err != nil { + // If it got an error here, ddl may hang forever, so this error log is important. + logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err)) + return nil, errors.Trace(err) + } + locateExprs = append(locateExprs, expr) + buf.Reset() + } + return locateExprs, nil +} + func getColumnsOffset(cols, columns []*expression.Column) []int { colsOffset := make([]int, len(cols)) for i, col := range columns { @@ -614,7 +633,7 @@ func extractPartitionExprColumns(ctx sessionctx.Context, pi *model.PartitionInfo } func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, - columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { + defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) { // The caller should assure partition info is not nil. pi := tblInfo.GetPartitionInfo() partExpr, exprCols, offset, err := extractPartitionExprColumns(ctx, pi, columns, names) @@ -623,9 +642,9 @@ func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, } listPrune := &ForListPruning{} if len(pi.Columns) == 0 { - err = listPrune.buildListPruner(ctx, tblInfo, exprCols, columns, names) + err = listPrune.buildListPruner(ctx, tblInfo, defs, exprCols, columns, names) } else { - err = listPrune.buildListColumnsPruner(ctx, tblInfo, columns, names) + err = listPrune.buildListColumnsPruner(ctx, tblInfo, defs, columns, names) } if err != nil { return nil, err @@ -638,7 +657,7 @@ func generateListPartitionExpr(ctx sessionctx.Context, tblInfo *model.TableInfo, return ret, nil } -func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, exprCols []*expression.Column, +func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, defs []model.PartitionDefinition, exprCols []*expression.Column, columns []*expression.Column, names types.NameSlice) error { pi := tblInfo.GetPartitionInfo() schema := expression.NewSchema(columns...) @@ -649,7 +668,7 @@ func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err)) return errors.Trace(err) } - // Since need to change the column index of the expresion, clone the expression first. + // Since need to change the column index of the expression, clone the expression first. lp.LocateExpr = expr.Clone() lp.PruneExprCols = exprCols lp.PruneExpr = expr.Clone() @@ -661,14 +680,15 @@ func (lp *ForListPruning) buildListPruner(ctx sessionctx.Context, tblInfo *model } c.Index = idx } - err = lp.buildListPartitionValueMap(ctx, tblInfo, schema, names, p) + err = lp.buildListPartitionValueMap(ctx, defs, schema, names, p) if err != nil { return err } return nil } -func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo *model.TableInfo, +func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, + tblInfo *model.TableInfo, defs []model.PartitionDefinition, columns []*expression.Column, names types.NameSlice) error { pi := tblInfo.GetPartitionInfo() schema := expression.NewSchema(columns...) @@ -694,7 +714,7 @@ func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo valueMap: make(map[string]ListPartitionLocation), sorted: btree.NewG[*btreeListColumnItem](btreeDegree, lessBtreeListColumnItem), } - err := colPrune.buildPartitionValueMapAndSorted(p) + err := colPrune.buildPartitionValueMapAndSorted(p, defs) if err != nil { return err } @@ -707,12 +727,11 @@ func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo // buildListPartitionValueMap builds list partition value map. // The map is column value -> partition index. // colIdx is the column index in the list columns. -func (lp *ForListPruning) buildListPartitionValueMap(ctx sessionctx.Context, tblInfo *model.TableInfo, +func (lp *ForListPruning) buildListPartitionValueMap(ctx sessionctx.Context, defs []model.PartitionDefinition, schema *expression.Schema, names types.NameSlice, p *parser.Parser) error { - pi := tblInfo.GetPartitionInfo() lp.valueMap = map[int64]int{} lp.nullPartitionIdx = -1 - for partitionIdx, def := range pi.Definitions { + for partitionIdx, def := range defs { for _, vs := range def.InValues { expr, err := parseSimpleExprWithNames(p, ctx, vs[0], schema, names) if err != nil { @@ -781,26 +800,27 @@ func (lp *ForListPruning) locateListColumnsPartitionByRow(ctx sessionctx.Context // buildPartitionValueMapAndSorted builds list columns partition value map for the specified column. // It also builds list columns partition value btree for the specified column. // colIdx is the specified column index in the list columns. -func (lp *ForListColumnPruning) buildPartitionValueMapAndSorted(p *parser.Parser) error { +func (lp *ForListColumnPruning) buildPartitionValueMapAndSorted(p *parser.Parser, + defs []model.PartitionDefinition) error { l := len(lp.valueMap) if l != 0 { return nil } - return lp.buildListPartitionValueMapAndSorted(p) + return lp.buildListPartitionValueMapAndSorted(p, defs) } // RebuildPartitionValueMapAndSorted rebuilds list columns partition value map for the specified column. -func (lp *ForListColumnPruning) RebuildPartitionValueMapAndSorted(p *parser.Parser) error { +func (lp *ForListColumnPruning) RebuildPartitionValueMapAndSorted(p *parser.Parser, + defs []model.PartitionDefinition) error { lp.valueMap = make(map[string]ListPartitionLocation, len(lp.valueMap)) lp.sorted.Clear(false) - return lp.buildListPartitionValueMapAndSorted(p) + return lp.buildListPartitionValueMapAndSorted(p, defs) } -func (lp *ForListColumnPruning) buildListPartitionValueMapAndSorted(p *parser.Parser) error { - pi := lp.tblInfo.GetPartitionInfo() +func (lp *ForListColumnPruning) buildListPartitionValueMapAndSorted(p *parser.Parser, defs []model.PartitionDefinition) error { sc := lp.ctx.GetSessionVars().StmtCtx - for partitionIdx, def := range pi.Definitions { + for partitionIdx, def := range defs { for groupIdx, vs := range def.InValues { keyBytes, err := lp.genConstExprKey(lp.ctx, sc, vs[lp.colIdx], lp.schema, lp.names, p) if err != nil { @@ -946,8 +966,8 @@ func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, } // PartitionExpr returns the partition expression. -func (t *partitionedTable) PartitionExpr() (*PartitionExpr, error) { - return t.partitionExpr, nil +func (t *partitionedTable) PartitionExpr() *PartitionExpr { + return t.partitionExpr } func (t *partitionedTable) GetPartitionColumnIDs() []int64 { @@ -998,7 +1018,7 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key { } func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error { - defID, err := t.locatePartition(ctx, pi, r) + defID, err := t.locatePartition(ctx, r) if err != nil { return err } @@ -1008,36 +1028,56 @@ func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi return nil } -// locatePartition returns the partition ID of the input record. -func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int64, error) { +// locatePartitionCommon returns the partition idx of the input record. +func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *model.PartitionInfo, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { var err error var idx int switch t.meta.Partition.Type { case model.PartitionTypeRange: if len(pi.Columns) == 0 { - idx, err = t.locateRangePartition(ctx, pi, r) + idx, err = t.locateRangePartition(ctx, partitionExpr, r) } else { - idx, err = t.locateRangeColumnPartition(ctx, pi, r) + idx, err = t.locateRangeColumnPartition(ctx, partitionExpr, r) } case model.PartitionTypeHash: + // Note that only LIST and RANGE supports REORGANIZE PARTITION + // TODO: Add support for ADD PARTITION and COALESCE PARTITION for HASH idx, err = t.locateHashPartition(ctx, pi, r) case model.PartitionTypeList: - idx, err = t.locateListPartition(ctx, pi, r) + idx, err = t.locateListPartition(ctx, partitionExpr, r) + } + if err != nil { + return 0, errors.Trace(err) } + return idx, nil +} + +func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { + pi := t.Meta().GetPartitionInfo() + idx, err := t.locatePartitionCommon(ctx, pi, t.partitionExpr, r) if err != nil { return 0, errors.Trace(err) } return pi.Definitions[idx].ID, nil } -func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { +func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { + pi := t.Meta().GetPartitionInfo() + idx, err := t.locatePartitionCommon(ctx, pi, t.reorgPartitionExpr, r) + if err != nil { + return 0, errors.Trace(err) + } + return pi.AddingDefinitions[idx].ID, nil +} + +func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { + upperBounds := partitionExpr.UpperBounds var lastError error - partitionExprs := t.partitionExpr.UpperBounds evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) defer t.evalBufferPool.Put(evalBuffer) - idx := sort.Search(len(partitionExprs), func(i int) bool { + idx := sort.Search(len(upperBounds), func(i int) bool { evalBuffer.SetDatums(r...) - ret, isNull, err := partitionExprs[i].EvalInt(ctx, evalBuffer.ToRow()) + ret, isNull, err := upperBounds[i].EvalInt(ctx, evalBuffer.ToRow()) if err != nil { lastError = err return true // Does not matter, will propagate the last error anyway. @@ -1052,11 +1092,11 @@ func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi if lastError != nil { return 0, errors.Trace(lastError) } - if idx >= len(partitionExprs) { + if idx >= len(upperBounds) { // The data does not belong to any of the partition returns `table has no partition for value %s`. var valueMsg string - if pi.Expr != "" { - e, err := expression.ParseSimpleExprWithTableInfo(ctx, pi.Expr, t.meta) + if t.meta.Partition.Expr != "" { + e, err := expression.ParseSimpleExprWithTableInfo(ctx, t.meta.Partition.Expr, t.meta) if err == nil { val, _, err := e.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) if err == nil { @@ -1072,15 +1112,15 @@ func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi return idx, nil } -func (t *partitionedTable) locateListPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { - lp := t.partitionExpr.ForListPruning +func (t *partitionedTable) locateListPartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { + lp := partitionExpr.ForListPruning if len(lp.ColPrunes) == 0 { return lp.locateListPartitionByRow(ctx, r) } return lp.locateListColumnsPartitionByRow(ctx, r) } -func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { +func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { var ( ret int64 val int64 @@ -1103,7 +1143,7 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *mode ret = val } unsigned := mysql.HasUnsignedFlag(t.partitionExpr.Expr.GetType().GetFlag()) - ranges := t.partitionExpr.ForRangePruning + ranges := partitionExpr.ForRangePruning length := len(ranges.LessThan) pos := sort.Search(length, func(i int) bool { if isNull { @@ -1117,8 +1157,8 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *mode if pos < 0 || pos >= length { // The data does not belong to any of the partition returns `table has no partition for value %s`. var valueMsg string - if pi.Expr != "" { - e, err := expression.ParseSimpleExprWithTableInfo(ctx, pi.Expr, t.meta) + if t.meta.Partition.Expr != "" { + e, err := expression.ParseSimpleExprWithTableInfo(ctx, t.meta.Partition.Expr, t.meta) if err == nil { val, _, err := e.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) if err == nil { @@ -1227,7 +1267,7 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro // GetPartitionByRow returns a Table, which is actually a Partition. func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { - pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r) + pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) } @@ -1236,7 +1276,7 @@ func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.D // GetPartitionByRow returns a Table, which is actually a Partition. func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { - pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r) + pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) } @@ -1252,8 +1292,7 @@ func (t *partitionedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, op } func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r []types.Datum, partitionSelection map[int64]struct{}, opts []table.AddRecordOption) (recordID kv.Handle, err error) { - partitionInfo := t.meta.GetPartitionInfo() - pid, err := t.locatePartition(ctx, partitionInfo, r) + pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) } @@ -1264,7 +1303,23 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r [] } } tbl := t.GetPartition(pid) - return tbl.AddRecord(ctx, r, opts...) + recordID, err = tbl.AddRecord(ctx, r, opts...) + if err != nil { + return + } + if _, ok := t.reorgPartitions[pid]; ok { + // Double write to the ongoing reorganized partition + pid, err = t.locateReorgPartition(ctx, r) + if err != nil { + return nil, errors.Trace(err) + } + tbl = t.GetPartition(pid) + recordID, err = tbl.AddRecord(ctx, r, opts...) + if err != nil { + return + } + } + return } // partitionTableWithGivenSets is used for this kind of grammar: partition (p0,p1) @@ -1301,14 +1356,29 @@ func (t *partitionTableWithGivenSets) GetAllPartitionIDs() []int64 { // RemoveRecord implements table.Table RemoveRecord interface. func (t *partitionedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - partitionInfo := t.meta.GetPartitionInfo() - pid, err := t.locatePartition(ctx, partitionInfo, r) + pid, err := t.locatePartition(ctx, r) if err != nil { return errors.Trace(err) } tbl := t.GetPartition(pid) - return tbl.RemoveRecord(ctx, h, r) + err = tbl.RemoveRecord(ctx, h, r) + if err != nil { + return errors.Trace(err) + } + + if _, ok := t.reorgPartitions[pid]; ok { + pid, err = t.locateReorgPartition(ctx, r) + if err != nil { + return errors.Trace(err) + } + tbl = t.GetPartition(pid) + err = tbl.RemoveRecord(ctx, h, r) + if err != nil { + return errors.Trace(err) + } + } + return nil } func (t *partitionedTable) GetAllPartitionIDs() []int64 { @@ -1331,12 +1401,11 @@ func (t *partitionTableWithGivenSets) UpdateRecord(ctx context.Context, sctx ses } func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, t *partitionedTable, h kv.Handle, currData, newData []types.Datum, touched []bool, partitionSelection map[int64]struct{}) error { - partitionInfo := t.meta.GetPartitionInfo() - from, err := t.locatePartition(ctx, partitionInfo, currData) + from, err := t.locatePartition(ctx, currData) if err != nil { return errors.Trace(err) } - to, err := t.locatePartition(ctx, partitionInfo, newData) + to, err := t.locatePartition(ctx, newData) if err != nil { return errors.Trace(err) } @@ -1363,16 +1432,96 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, // So this special order is chosen: add record first, errors such as // 'Key Already Exists' will generally happen during step1, errors are // unlikely to happen in step2. + // TODO: check what happens with foreign keys in step 2? err = t.GetPartition(from).RemoveRecord(ctx, h, currData) if err != nil { + // TODO, test this!! I assume that we need to clean this up, + // since there are non-atomic changes in the transaction buffer + // which if committed will cause inconsistencies? + // What to do if something during the cleanup fails? Can we block + // the transaction from ever being committed? logutil.BgLogger().Error("update partition record fails", zap.String("message", "new record inserted while old record is not removed"), zap.Error(err)) return errors.Trace(err) } + // TODO: Test if the update is in different partitions before reorg, + // but is now in the same during the reorg? And vice-versa... + // What if the change is in the same reorged partition?!? + newTo, newFrom := int64(-1), int64(-1) + if _, ok := t.reorgPartitions[to]; ok { + newTo, err = t.locateReorgPartition(ctx, newData) + // There might be valid cases when errors should be accepted? + if err != nil { + return errors.Trace(err) + } + } + if _, ok := t.reorgPartitions[from]; ok { + newFrom, err = t.locateReorgPartition(ctx, currData) + // There might be valid cases when errors should be accepted? + if err != nil { + return errors.Trace(err) + } + } + if newTo == newFrom && newTo != -1 { + tbl := t.GetPartition(newTo) + return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + } + if newTo != -1 { + tbl := t.GetPartition(newTo) + _, err = tbl.AddRecord(ctx, newData) + if err != nil { + return errors.Trace(err) + } + } + if newFrom != -1 { + tbl := t.GetPartition(newFrom) + err = tbl.RemoveRecord(ctx, h, currData) + // How to handle error, which can happen when the data is not yet backfilled + // TODO: Create a test for this!!! + if err != nil { + return errors.Trace(err) + } + } return nil } - tbl := t.GetPartition(to) - return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + if err != nil { + return errors.Trace(err) + } + if _, ok := t.reorgPartitions[to]; ok { + // Even if to == from, in the reorganized partitions they may differ + // like in case of a split + newTo, err := t.locateReorgPartition(ctx, newData) + if err != nil { + return errors.Trace(err) + } + newFrom, err := t.locateReorgPartition(ctx, currData) + if err != nil { + return errors.Trace(err) + } + if newTo == newFrom { + tbl = t.GetPartition(newTo) + err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) + if err != nil { + return errors.Trace(err) + } + return nil + } + tbl = t.GetPartition(newTo) + _, err = tbl.AddRecord(ctx, newData) + // TODO: Could there be a case where a duplicate unique key could happen here? + if err != nil { + return errors.Trace(err) + } + tbl = t.GetPartition(newFrom) + err = tbl.RemoveRecord(ctx, h, currData) + // How to handle error, which can happen when the data is not yet backfilled + // TODO: Create a test for this!!! + if err != nil { + return errors.Trace(err) + } + } + return nil } // FindPartitionByName finds partition in table meta by name. diff --git a/table/tables/partition_test.go b/table/tables/partition_test.go index cc8dd90a44737..0bac493aa7f35 100644 --- a/table/tables/partition_test.go +++ b/table/tables/partition_test.go @@ -273,10 +273,9 @@ func TestGeneratePartitionExpr(t *testing.T) { tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) type partitionExpr interface { - PartitionExpr() (*tables.PartitionExpr, error) + PartitionExpr() *tables.PartitionExpr } - pe, err := tbl.(partitionExpr).PartitionExpr() - require.NoError(t, err) + pe := tbl.(partitionExpr).PartitionExpr() upperBounds := []string{ "lt(t1.id, 4)",