Skip to content

Commit

Permalink
planner, executor: support batchget for range and list partition table (
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuo-zhi authored May 28, 2021
1 parent 7a15d64 commit ac04473
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 18 deletions.
59 changes: 52 additions & 7 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ import (
"sort"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -46,6 +49,7 @@ type BatchPointGetExec struct {
idxInfo *model.IndexInfo
handles []kv.Handle
physIDs []int64
partExpr *tables.PartitionExpr
partPos int
singlePart bool
partTblID int64
Expand Down Expand Up @@ -227,7 +231,11 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
continue
}

physID := getPhysID(e.tblInfo, idxVals[e.partPos].GetInt64())
physID, err := getPhysID(e.tblInfo, e.partExpr, idxVals[e.partPos].GetInt64())
if err != nil {
continue
}

// If this BatchPointGetExec is built only for the specific table partition, skip those filters not matching this partition.
if e.singlePart && e.partTblID != physID {
continue
Expand Down Expand Up @@ -355,13 +363,19 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
tID = e.physIDs[i]
} else {
if handle.IsInt() {
tID = getPhysID(e.tblInfo, handle.IntValue())
tID, err = getPhysID(e.tblInfo, e.partExpr, handle.IntValue())
if err != nil {
continue
}
} else {
_, d, err1 := codec.DecodeOne(handle.EncodedCol(e.partPos))
if err1 != nil {
return err1
}
tID = getPhysID(e.tblInfo, d.GetInt64())
tID, err = getPhysID(e.tblInfo, e.partExpr, d.GetInt64())
if err != nil {
continue
}
}
}
// If this BatchPointGetExec is built only for the specific table partition, skip those handles not matching this partition.
Expand Down Expand Up @@ -466,13 +480,44 @@ func (getter *PessimisticLockCacheGetter) Get(_ context.Context, key kv.Key) ([]
return nil, kv.ErrNotExist
}

func getPhysID(tblInfo *model.TableInfo, intVal int64) int64 {
func getPhysID(tblInfo *model.TableInfo, partitionExpr *tables.PartitionExpr, intVal int64) (int64, error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return tblInfo.ID
return tblInfo.ID, nil
}

if partitionExpr == nil {
return tblInfo.ID, nil
}
partIdx := math.Abs(intVal % int64(pi.Num))
return pi.Definitions[partIdx].ID

switch pi.Type {
case model.PartitionTypeHash:
partIdx := math.Abs(intVal % int64(pi.Num))
return pi.Definitions[partIdx].ID, nil
case model.PartitionTypeRange:
// we've check the type assertions in func TryFastPlan
col, ok := partitionExpr.Expr.(*expression.Column)
if !ok {
return 0, errors.Errorf("unsupported partition type in BatchGet")
}
unsigned := mysql.HasUnsignedFlag(col.GetType().Flag)
ranges := partitionExpr.ForRangePruning
length := len(ranges.LessThan)
partIdx := sort.Search(length, func(i int) bool {
return ranges.Compare(i, intVal, unsigned) > 0
})
if partIdx >= 0 && partIdx < length {
return pi.Definitions[partIdx].ID, nil
}
case model.PartitionTypeList:
isNull := false // we've guaranteed this in the build process of either TryFastPlan or buildBatchPointGet
partIdx := partitionExpr.ForListPruning.LocatePartition(intVal, isNull)
if partIdx >= 0 {
return pi.Definitions[partIdx].ID, nil
}
}

return 0, errors.Errorf("dual partition")
}

type cacheBatchGetter struct {
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3939,6 +3939,7 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
desc: plan.Desc,
lock: plan.Lock,
waitTime: plan.LockWaitTime,
partExpr: plan.PartitionExpr,
partPos: plan.PartitionColPos,
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
Expand Down
102 changes: 102 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,108 @@ func (s *partitionTableSuite) TestDynamicPruningUnderIndexJoin(c *C) {
tk.MustQuery(`select /*+ INL_JOIN(touter, tnormal) */ tnormal.* from touter join tnormal use index(idx_b) on touter.b = tnormal.b`).Sort().Rows())
}

func (s *partitionTableSuite) TestBatchGetforRangeandListPartitionTable(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create database test_pointget")
tk.MustExec("use test_pointget")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("set @@session.tidb_enable_list_partition = ON")

// list partition table
tk.MustExec(`create table tlist(a int, b int, unique index idx_a(a), index idx_b(b)) partition by list(a)(
partition p0 values in (1, 2, 3, 4),
partition p1 values in (5, 6, 7, 8),
partition p2 values in (9, 10, 11, 12));`)

// range partition table
tk.MustExec(`create table trange(a int, unique key(a)) partition by range(a) (
partition p0 values less than (30),
partition p1 values less than (60),
partition p2 values less than (90),
partition p3 values less than (120));`)

// hash partition table
tk.MustExec("create table thash(a int unsigned, unique key(a)) partition by hash(a) partitions 4;")

// insert data into list partition table
tk.MustExec("insert into tlist values(1,1), (2,2), (3, 3), (4, 4), (5,5), (6, 6), (7,7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12);")
// regular partition table
tk.MustExec("create table tregular1(a int, unique key(a));")
tk.MustExec("create table tregular2(a int, unique key(a));")

vals := make([]string, 0, 100)
// insert data into range partition table and hash partition table
for i := 0; i < 100; i++ {
vals = append(vals, fmt.Sprintf("(%v)", i+1))
}
tk.MustExec("insert into trange values " + strings.Join(vals, ","))
tk.MustExec("insert into thash values " + strings.Join(vals, ","))
tk.MustExec("insert into tregular1 values " + strings.Join(vals, ","))
tk.MustExec("insert into tregular2 values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12)")

// test BatchGet
for i := 0; i < 100; i++ {
// explain select a from t where a in ({x1}, {x2}, ... {x10}); // BatchGet is used
// select a from t where where a in ({x1}, {x2}, ... {x10});
points := make([]string, 0, 10)
for i := 0; i < 10; i++ {
x := rand.Intn(100) + 1
points = append(points, fmt.Sprintf("%v", x))
}
queryRegular1 := fmt.Sprintf("select a from tregular1 where a in (%v)", strings.Join(points, ","))

queryHash := fmt.Sprintf("select a from thash where a in (%v)", strings.Join(points, ","))
c.Assert(tk.HasPlan(queryHash, "Batch_Point_Get"), IsTrue) // check if BatchGet is used
tk.MustQuery(queryHash).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows())

queryRange := fmt.Sprintf("select a from trange where a in (%v)", strings.Join(points, ","))
c.Assert(tk.HasPlan(queryRange, "Batch_Point_Get"), IsTrue) // check if BatchGet is used
tk.MustQuery(queryRange).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows())

points = make([]string, 0, 10)
for i := 0; i < 10; i++ {
x := rand.Intn(12) + 1
points = append(points, fmt.Sprintf("%v", x))
}
queryRegular2 := fmt.Sprintf("select a from tregular2 where a in (%v)", strings.Join(points, ","))
queryList := fmt.Sprintf("select a from tlist where a in (%v)", strings.Join(points, ","))
c.Assert(tk.HasPlan(queryList, "Batch_Point_Get"), IsTrue) // check if BatchGet is used
tk.MustQuery(queryList).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows())
}

// test different data type
// unsigned flag
// partition table and reguar table pair
tk.MustExec(`create table trange3(a int unsigned, unique key(a)) partition by range(a) (
partition p0 values less than (30),
partition p1 values less than (60),
partition p2 values less than (90),
partition p3 values less than (120));`)
tk.MustExec("create table tregular3(a int unsigned, unique key(a));")
vals = make([]string, 0, 100)
// insert data into range partition table and hash partition table
for i := 0; i < 100; i++ {
vals = append(vals, fmt.Sprintf("(%v)", i+1))
}
tk.MustExec("insert into trange3 values " + strings.Join(vals, ","))
tk.MustExec("insert into tregular3 values " + strings.Join(vals, ","))
// test BatchGet
// explain select a from t where a in ({x1}, {x2}, ... {x10}); // BatchGet is used
// select a from t where where a in ({x1}, {x2}, ... {x10});
points := make([]string, 0, 10)
for i := 0; i < 10; i++ {
x := rand.Intn(100) + 1
points = append(points, fmt.Sprintf("%v", x))
}
queryRegular := fmt.Sprintf("select a from tregular3 where a in (%v)", strings.Join(points, ","))
queryRange := fmt.Sprintf("select a from trange3 where a in (%v)", strings.Join(points, ","))
c.Assert(tk.HasPlan(queryRange, "Batch_Point_Get"), IsTrue) // check if BatchGet is used
tk.MustQuery(queryRange).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows())
}

func (s *partitionTableSuite) TestGlobalStatsAndSQLBinding(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
Expand Down
3 changes: 2 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty, ca
Columns: ds.Columns,
SinglePart: ds.isPartition,
PartTblID: ds.physicalTableID,
PartitionExpr: getPartitionExpr(ds.ctx, ds.TableInfo()),
}.Init(ds.ctx, ds.tableStats.ScaleByExpectCnt(accessCnt), ds.schema.Clone(), ds.names, ds.blockOffset)
if batchPointGetPlan.KeepOrder {
batchPointGetPlan.Desc = prop.SortItems[0].Desc
Expand All @@ -1748,7 +1749,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty, ca
batchPointGetPlan.IndexInfo = candidate.path.Index
batchPointGetPlan.IdxCols = candidate.path.IdxCols
batchPointGetPlan.IdxColLens = candidate.path.IdxColLens
batchPointGetPlan.PartitionColPos = getPartitionColumnPos(candidate.path.Index, hashPartColName)
batchPointGetPlan.PartitionColPos = getHashPartitionColumnPos(candidate.path.Index, hashPartColName)
for _, ran := range candidate.path.Ranges {
batchPointGetPlan.IndexValues = append(batchPointGetPlan.IndexValues, ran.LowVal)
}
Expand Down
88 changes: 78 additions & 10 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type BatchPointGetPlan struct {
IdxCols []*expression.Column
IdxColLens []int
PartitionColPos int
PartitionExpr *tables.PartitionExpr
KeepOrder bool
Desc bool
Lock bool
Expand Down Expand Up @@ -532,10 +533,10 @@ func newBatchPointGetPlan(
names []*types.FieldName, whereColNames []string, indexHints []*ast.IndexHint,
) *BatchPointGetPlan {
statsInfo := &property.StatsInfo{RowCount: float64(len(patternInExpr.List))}
var partitionColName *ast.ColumnName
var partitionExpr *tables.PartitionExpr
if tbl.GetPartitionInfo() != nil {
partitionColName = getHashPartitionColumnName(ctx, tbl)
if partitionColName == nil {
partitionExpr = getPartitionExpr(ctx, tbl)
if partitionExpr == nil {
return nil
}
}
Expand Down Expand Up @@ -577,9 +578,10 @@ func newBatchPointGetPlan(
handleParams[i] = param
}
return BatchPointGetPlan{
TblInfo: tbl,
Handles: handles,
HandleParams: handleParams,
TblInfo: tbl,
Handles: handles,
HandleParams: handleParams,
PartitionExpr: partitionExpr,
}.Init(ctx, statsInfo, schema, names, 0)
}

Expand Down Expand Up @@ -626,6 +628,12 @@ func newBatchPointGetPlan(
if matchIdxInfo == nil {
return nil
}

pos, err := getPartitionColumnPos(matchIdxInfo, partitionExpr, tbl)
if err != nil {
return nil
}

indexValues := make([][]types.Datum, len(patternInExpr.List))
indexValueParams := make([][]*driver.ParamMarkerExpr, len(patternInExpr.List))
for i, item := range patternInExpr.List {
Expand Down Expand Up @@ -691,7 +699,8 @@ func newBatchPointGetPlan(
IndexInfo: matchIdxInfo,
IndexValues: indexValues,
IndexValueParams: indexValueParams,
PartitionColPos: getPartitionColumnPos(matchIdxInfo, partitionColName),
PartitionColPos: pos,
PartitionExpr: partitionExpr,
}.Init(ctx, statsInfo, schema, names, 0)
}

Expand Down Expand Up @@ -1469,6 +1478,9 @@ func buildHandleCols(ctx sessionctx.Context, tbl *model.TableInfo, schema *expre

func getPartitionInfo(ctx sessionctx.Context, tbl *model.TableInfo, pairs []nameValuePair) (*model.PartitionDefinition, int, bool) {
partitionExpr := getPartitionExpr(ctx, tbl)
if partitionExpr == nil {
return nil, 0, false
}

pi := tbl.GetPartitionInfo()
if pi == nil {
Expand Down Expand Up @@ -1549,8 +1561,58 @@ func findPartitionIdx(idxInfo *model.IndexInfo, pos int, pairs []nameValuePair)
return 0
}

// getPartitionColumnPos gets the partition column's position in the index.
func getPartitionColumnPos(idx *model.IndexInfo, partitionColName *ast.ColumnName) int {
// getPartitionColumnPos gets the partition column's position in the unique index.
func getPartitionColumnPos(idx *model.IndexInfo, partitionExpr *tables.PartitionExpr, tbl *model.TableInfo) (int, error) {
// regular table
if partitionExpr == nil {
return 0, nil
}
pi := tbl.GetPartitionInfo()
if pi == nil {
return 0, nil
}

var partitionName model.CIStr
switch pi.Type {
case model.PartitionTypeHash:
if col, ok := partitionExpr.OrigExpr.(*ast.ColumnNameExpr); ok {
partitionName = col.Name.Name
} else {
return 0, errors.Errorf("unsupported partition type in BatchGet")
}
case model.PartitionTypeRange:
// left range columns partition for future development
if len(pi.Columns) == 0 {
if col, ok := partitionExpr.Expr.(*expression.Column); ok {
colInfo := findColNameByColID(tbl.Columns, col)
partitionName = colInfo.Name
}
} else {
return 0, errors.Errorf("unsupported partition type in BatchGet")
}
case model.PartitionTypeList:
// left list columns partition for future development
if partitionExpr.ForListPruning.ColPrunes == nil {
locateExpr := partitionExpr.ForListPruning.LocateExpr
if locateExpr, ok := locateExpr.(*expression.Column); ok {
colInfo := findColNameByColID(tbl.Columns, locateExpr)
partitionName = colInfo.Name
}
} else {
return 0, errors.Errorf("unsupported partition type in BatchGet")
}
}

for i, idxCol := range idx.Columns {
if partitionName.L == idxCol.Name.L {
return i, nil
}
}
panic("unique index must include all partition columns")
}

// getHashPartitionColumnPos gets the hash partition column's position in the unique index.
func getHashPartitionColumnPos(idx *model.IndexInfo, partitionColName *ast.ColumnName) int {
if partitionColName == nil {
return 0
}
Expand All @@ -1568,8 +1630,14 @@ func getPartitionExpr(ctx sessionctx.Context, tbl *model.TableInfo) *tables.Part
if !ok {
return nil
}

partTable, ok := table.(partitionTable)
if !ok {
return nil
}

// PartitionExpr don't need columns and names for hash partition.
partitionExpr, err := table.(partitionTable).PartitionExpr()
partitionExpr, err := partTable.PartitionExpr()
if err != nil {
return nil
}
Expand Down

0 comments on commit ac04473

Please sign in to comment.