Skip to content

Commit

Permalink
cherry pick pingcap#22152 to release-5.0-rc
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
you06 authored and ti-srebot committed Jan 12, 2021
1 parent fd4437d commit aedecb5
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 9 deletions.
15 changes: 14 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,10 @@ func (r *correlatedAggregateResolver) resolveSelect(sel *ast.SelectStmt) (err er
if err != nil {
return err
}
// do not use cache when for update read
if isForUpdateReadSelectLock(sel.LockInfo) {
useCache = false
}
// we cannot use cache if there are correlated aggregates inside FROM clause,
// since the plan we are building now is not correct and need to be rebuild later.
p, err := r.b.buildTableRefs(r.ctx, sel.From, useCache)
Expand Down Expand Up @@ -3248,6 +3252,11 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L
projExprs []expression.Expression
)

// set for update read to true before building result set node
if isForUpdateReadSelectLock(sel.LockInfo) {
b.isForUpdateRead = true
}

// For sub-queries, the FROM clause may have already been built in outer query when resolving correlated aggregates.
// If the ResultSetNode inside FROM clause has nothing to do with correlated aggregates, we can simply get the
// existing ResultSetNode from the cache.
Expand Down Expand Up @@ -3547,7 +3556,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if tblName.L == "" {
tblName = tn.Name
}
possiblePaths, err := getPossibleAccessPaths(b.ctx, b.TableHints(), tn.IndexHints, tbl, dbName, tblName)
possiblePaths, err := getPossibleAccessPaths(b.ctx, b.TableHints(), tn.IndexHints, tbl, dbName, tblName, b.isForUpdateRead, b.is.SchemaMetaVersion())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3644,6 +3653,8 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
partitionNames: tn.PartitionNames,
TblCols: make([]*expression.Column, 0, len(columns)),
preferPartitions: make(map[int][]model.CIStr),
is: b.is,
isForUpdateRead: b.isForUpdateRead,
}.Init(b.ctx, b.getSelectOffset())
var handleCols HandleCols
schema := expression.NewSchema(make([]*expression.Column, 0, len(columns))...)
Expand Down Expand Up @@ -4169,6 +4180,7 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) (
}

b.inUpdateStmt = true
b.isForUpdateRead = true

p, err := b.buildResultSetNode(ctx, update.TableRefs.TableRefs)
if err != nil {
Expand Down Expand Up @@ -4509,6 +4521,7 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) (
}()

b.inDeleteStmt = true
b.isForUpdateRead = true

p, err := b.buildResultSetNode(ctx, delete.TableRefs.TableRefs)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -516,6 +517,11 @@ type DataSource struct {
// preferPartitions store the map, the key represents store type, the value represents the partition name list.
preferPartitions map[int][]model.CIStr
SampleInfo *TableSampleInfo
is infoschema.InfoSchema
// isForUpdateRead should be true in either of the following situations
// 1. use `inside insert`, `update`, `delete` or `select for update` statement
// 2. isolation level is RC
isForUpdateRead bool
}

// ExtractCorrelatedCols implements LogicalPlan interface.
Expand Down
88 changes: 85 additions & 3 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/parser/opcode"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -465,6 +466,10 @@ type PlanBuilder struct {
// cache ResultSetNodes and HandleHelperMap to avoid rebuilding.
cachedResultSetNodes map[*ast.Join]LogicalPlan
cachedHandleHelperMap map[*ast.Join]map[int64][]HandleCols
// isForUpdateRead should be true in either of the following situations
// 1. use `inside insert`, `update`, `delete` or `select for update` statement
// 2. isolation level is RC
isForUpdateRead bool
}

type handleColHelper struct {
Expand Down Expand Up @@ -578,6 +583,7 @@ func NewPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, processor
correlatedAggMapper: make(map[*ast.AggregateFuncExpr]*expression.CorrelatedColumn),
cachedResultSetNodes: make(map[*ast.Join]LogicalPlan),
cachedHandleHelperMap: make(map[*ast.Join]map[int64][]HandleCols),
isForUpdateRead: sctx.GetSessionVars().IsPessimisticReadConsistency(),
}, savedBlockNames
}

Expand Down Expand Up @@ -856,7 +862,39 @@ func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInf
}
}

func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) {
// isForUpdateReadSelectLock checks if the lock type need to use forUpdateRead
func isForUpdateReadSelectLock(lock *ast.SelectLockInfo) bool {
if lock == nil {
return false
}
return lock.LockType == ast.SelectLockForUpdate ||
lock.LockType == ast.SelectLockForUpdateNoWait ||
lock.LockType == ast.SelectLockForUpdateWaitN
}

// getLatestIndexInfo gets the index info of latest schema version from given table id,
// it returns nil if the schema version is not changed
func getLatestIndexInfo(ctx sessionctx.Context, id int64, startVer int64) (map[int64]*model.IndexInfo, bool, error) {
dom := domain.GetDomain(ctx)
if dom == nil {
return nil, false, errors.New("domain not found for ctx")
}
is := dom.InfoSchema()
if is.SchemaMetaVersion() == startVer {
return nil, false, nil
}
latestIndexes := make(map[int64]*model.IndexInfo)
latestTbl, exist := is.TableByID(id)
if exist {
latestTblInfo := latestTbl.Meta()
for _, index := range latestTblInfo.Indices {
latestIndexes[index.ID] = index
}
}
return latestIndexes, true, nil
}

func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr, check bool, startVer int64) ([]*util.AccessPath, error) {
tblInfo := tbl.Meta()
publicPaths := make([]*util.AccessPath, 0, len(tblInfo.Indices)+2)
tp := kv.TiKV
Expand All @@ -871,6 +909,11 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true))
}
optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes

check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

for _, index := range tblInfo.Indices {
if index.State == model.StatePublic {
// Filter out invisible index, because they are not visible for optimizer
Expand All @@ -880,6 +923,17 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
if tblInfo.IsCommonHandle && index.Primary {
continue
}
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(ctx, tblInfo.ID, 0)
if err != nil {
return nil, err
}
}
if check {
if latestIndex, ok := latestIndexes[index.ID]; !ok || latestIndex.State != model.StatePublic {
continue
}
}
publicPaths = append(publicPaths, &util.AccessPath{Index: index})
}
}
Expand Down Expand Up @@ -1350,13 +1404,40 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
// get index information
indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices))

check := b.isForUpdateRead && b.ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

for _, idx := range indices {
idxInfo := idx.Meta()
if idxInfo.State != model.StatePublic {
logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public",
zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O))
logutil.Logger(ctx).Info("build physical index lookup reader, the index isn't public",
zap.String("index", idxInfo.Name.O),
zap.Stringer("state", idxInfo.State),
zap.String("table", tblInfo.Name.O))
continue
}
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(b.ctx, tblInfo.ID, b.is.SchemaMetaVersion())
if err != nil {
return nil, nil, err
}
}
if check {
if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic {
forUpdateState := model.StateNone
if ok {
forUpdateState = latestIndex.State
}
logutil.Logger(ctx).Info("build physical index lookup reader, the index isn't public in forUpdateRead",
zap.String("index", idxInfo.Name.O),
zap.Stringer("state", idxInfo.State),
zap.Stringer("forUpdateRead state", forUpdateState),
zap.String("table", tblInfo.Name.O))
continue
}
}
indexInfos = append(indexInfos, idxInfo)
// For partition tables.
if pi := tbl.Meta().GetPartitionInfo(); pi != nil {
Expand Down Expand Up @@ -2636,6 +2717,7 @@ func (b *PlanBuilder) buildValuesListOfInsert(ctx context.Context, insert *ast.I
}

func (b *PlanBuilder) buildSelectPlanOfInsert(ctx context.Context, insert *ast.InsertStmt, insertPlan *Insert) error {
b.isForUpdateRead = true
affectedValuesCols, err := b.getAffectCols(insert, insertPlan)
if err != nil {
return err
Expand Down
42 changes: 38 additions & 4 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
<<<<<<< HEAD
=======
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
>>>>>>> 4088c2011... planner: check index valid while forUpdateRead (#22152)
"github.com/pingcap/tidb/util/math"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// PointGetPlan is a fast plan for simple point get.
Expand Down Expand Up @@ -428,7 +434,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
p = fp
return
}
if fp := tryPointGetPlan(ctx, x); fp != nil {
if fp := tryPointGetPlan(ctx, x, isForUpdateReadSelectLock(x.LockInfo)); fp != nil {
if checkFastPlanPrivilege(ctx, fp.dbName, fp.TblInfo.Name.L, mysql.SelectPriv) != nil {
return nil
}
Expand Down Expand Up @@ -742,7 +748,7 @@ func tryWhereIn2BatchPointGet(ctx sessionctx.Context, selStmt *ast.SelectStmt) *
// 2. It must be a single table select.
// 3. All the columns must be public and generated.
// 4. The condition is an access path that the range is a unique key.
func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetPlan {
func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool) *PointGetPlan {
if selStmt.Having != nil {
return nil
} else if selStmt.Limit != nil {
Expand Down Expand Up @@ -823,11 +829,27 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
return nil
}

check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

for _, idxInfo := range tbl.Indices {
if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible {
continue
}
if isTableDual {
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(ctx, tbl.ID, 0)
if err != nil {
logutil.BgLogger().Warn("get information schema failed", zap.Error(err))
return nil
}
}
if check {
if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic {
continue
}
}
p := newPointGetPlan(ctx, tblName.Schema.O, schema, tbl, names)
p.IsTableDual = true
return p
Expand All @@ -837,6 +859,18 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
if idxValues == nil {
continue
}
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(ctx, tbl.ID, 0)
if err != nil {
logutil.BgLogger().Warn("get information schema failed", zap.Error(err))
return nil
}
}
if check {
if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic {
continue
}
}
p := newPointGetPlan(ctx, dbName, schema, tbl, names)
p.IndexInfo = idxInfo
p.IndexValues = idxValues
Expand Down Expand Up @@ -1149,7 +1183,7 @@ func tryUpdatePointPlan(ctx sessionctx.Context, updateStmt *ast.UpdateStmt) Plan
OrderBy: updateStmt.Order,
Limit: updateStmt.Limit,
}
pointGet := tryPointGetPlan(ctx, selStmt)
pointGet := tryPointGetPlan(ctx, selStmt, true)
if pointGet != nil {
if pointGet.IsTableDual {
return PhysicalTableDual{
Expand Down Expand Up @@ -1243,7 +1277,7 @@ func tryDeletePointPlan(ctx sessionctx.Context, delStmt *ast.DeleteStmt) Plan {
OrderBy: delStmt.Order,
Limit: delStmt.Limit,
}
if pointGet := tryPointGetPlan(ctx, selStmt); pointGet != nil {
if pointGet := tryPointGetPlan(ctx, selStmt, true); pointGet != nil {
if pointGet.IsTableDual {
return PhysicalTableDual{
names: pointGet.outputNames,
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ func pruneUseBinarySearch(lessThan lessThanDataInt, data dataForPrune, unsigned
func (s *partitionProcessor) resolveAccessPaths(ds *DataSource) error {
possiblePaths, err := getPossibleAccessPaths(
ds.ctx, &tableHintInfo{indexMergeHintList: ds.indexMergeHints, indexHintList: ds.IndexHints},
ds.astIndexHints, ds.table, ds.DBName, ds.tableInfo.Name)
ds.astIndexHints, ds.table, ds.DBName, ds.tableInfo.Name, ds.isForUpdateRead, ds.is.SchemaMetaVersion())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit aedecb5

Please sign in to comment.