Skip to content

Commit

Permalink
Merge branch 'master' into merge_br_squash
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 5, 2021
2 parents 7cc48d4 + cc1f990 commit 30ac421
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 19 deletions.
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,8 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
if b.err != nil {
return nil
}
b.err = plannercore.CheckUpdateList(assignFlag, v)
// should use the new tblID2table, since the update's schema may have been changed in Execstmt.
b.err = plannercore.CheckUpdateList(assignFlag, v, tblID2table)
if b.err != nil {
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}

ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret)); err != nil {
pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode}
err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe))
if err != nil {
return nil, err
}
stmtNode = plannercore.TryAddExtraLimit(c.Ctx, stmtNode)
Expand Down
4 changes: 4 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
preparedObj.Executor = nil
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// Example:
// When running update in prepared statement's schema version distinguished from the one of execute statement
// We should reset the tableRefs in the prepared update statements, otherwise, the ast nodes still hold the old
// tableRefs columnInfo which will cause chaos in logic of trying point get plan. (should ban non-public column)
ret := &PreprocessorReturn{InfoSchema: is}
err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret))
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,16 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida
return candidates
}

func (ds *DataSource) isPointGetConvertableSchema() bool {
for _, col := range ds.Columns {
// Only handle tables that all columns are public.
if col.State != model.StatePublic {
return false
}
}
return true
}

// findBestTask implements the PhysicalPlan interface.
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (t task, cntPlan int64, err error) {
Expand Down Expand Up @@ -745,7 +755,7 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
p: dual,
}, cntPlan, nil
}
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema()
if canConvertPointGet && !path.IsIntHandlePath {
// We simply do not build [batch] point get for prefix indexes. This can be optimized.
canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex()
Expand Down
4 changes: 2 additions & 2 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4655,10 +4655,10 @@ type tblUpdateInfo struct {
}

// CheckUpdateList checks all related columns in updatable state.
func CheckUpdateList(assignFlags []int, updt *Update) error {
func CheckUpdateList(assignFlags []int, updt *Update, newTblID2Table map[int64]table.Table) error {
updateFromOtherAlias := make(map[int64]tblUpdateInfo)
for _, content := range updt.TblColPosInfos {
tbl := updt.tblID2Table[content.TblID]
tbl := newTblID2Table[content.TblID]
flags := assignFlags[content.Start:content.End]
var update, updatePK bool
for i, col := range tbl.WritableCols() {
Expand Down
26 changes: 24 additions & 2 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt {
}
}

// WithExecuteInfoSchemaUpdate return a PreprocessOpt to update the `Execute` infoSchema under some conditions.
func WithExecuteInfoSchemaUpdate(pe *PreprocessExecuteISUpdate) PreprocessOpt {
return func(p *preprocessor) {
p.PreprocessExecuteISUpdate = pe
}
}

// TryAddExtraLimit trys to add an extra limit for SELECT or UNION statement when sql_select_limit is set.
func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode {
if ctx.GetSessionVars().SelectLimit == math.MaxUint64 || ctx.GetSessionVars().InRestrictedSQL {
Expand Down Expand Up @@ -143,6 +150,12 @@ type PreprocessorReturn struct {
TxnScope string
}

// PreprocessExecuteISUpdate is used to update information schema for special Execute statement in the preprocessor.
type PreprocessExecuteISUpdate struct {
ExecuteInfoSchemaUpdate func(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema
Node ast.Node
}

// preprocessor is an ast.Visitor that preprocess
// ast Nodes parsed from parser.
type preprocessor struct {
Expand All @@ -157,6 +170,7 @@ type preprocessor struct {

// values that may be returned
*PreprocessorReturn
*PreprocessExecuteISUpdate
err error
}

Expand Down Expand Up @@ -1596,9 +1610,17 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
// - session variable
// - transaction context
func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema {
if p.InfoSchema == nil {
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
if p.InfoSchema != nil {
return p.InfoSchema
}
// `Execute` under some conditions need to see the latest information schema.
if p.PreprocessExecuteISUpdate != nil {
if newInfoSchema := p.ExecuteInfoSchemaUpdate(p.Node, p.ctx); newInfoSchema != nil {
p.InfoSchema = newInfoSchema
return p.InfoSchema
}
}
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
return p.InfoSchema
}

Expand Down
30 changes: 18 additions & 12 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool {
return ast.IsReadOnly(node)
}

// GetExecuteForUpdateReadIS is used to check whether the statement is `execute` and target statement has a forUpdateRead flag.
// If so, we will return the latest information schema.
func GetExecuteForUpdateReadIS(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema {
if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt {
vars := sctx.GetSessionVars()
execID := execStmt.ExecID
if execStmt.Name != "" {
execID = vars.PreparedStmtNameToID[execStmt.Name]
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
return domain.GetDomain(sctx).InfoSchema()
}
}
}
return nil
}

// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (plannercore.Plan, types.NameSlice, error) {
Expand Down Expand Up @@ -318,18 +336,6 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
sctx.GetSessionVars().RewritePhaseInfo.DurationRewrite = time.Since(beginRewrite)

if execPlan, ok := p.(*plannercore.Execute); ok {
execID := execPlan.ExecID
if execPlan.Name != "" {
execID = sctx.GetSessionVars().PreparedStmtNameToID[execPlan.Name]
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
is = domain.GetDomain(sctx).InfoSchema()
}
}
}

sctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo()
activeRoles := sctx.GetSessionVars().ActiveRoles
// Check privilege. Maybe it's better to move this to the Preprocess, but
Expand Down

0 comments on commit 30ac421

Please sign in to comment.