From cc1f990bacaf596f838fad2728a9951bb5a7a0d8 Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Thu, 5 Aug 2021 23:41:13 +0800 Subject: [PATCH] planner: fix update panic when update in prepare and execute (#26759) --- executor/builder.go | 3 ++- executor/compiler.go | 4 +++- planner/core/common_plans.go | 4 ++++ planner/core/find_best_task.go | 12 ++++++++++- planner/core/logical_plan_builder.go | 4 ++-- planner/core/preprocess.go | 26 ++++++++++++++++++++++-- planner/optimize.go | 30 +++++++++++++++++----------- 7 files changed, 64 insertions(+), 19 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 1c6b18dc51d88..10e6b22148a65 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1950,7 +1950,8 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { if b.err != nil { return nil } - b.err = plannercore.CheckUpdateList(assignFlag, v) + // should use the new tblID2table, since the update's schema may have been changed in Execstmt. + b.err = plannercore.CheckUpdateList(assignFlag, v, tblID2table) if b.err != nil { return nil } diff --git a/executor/compiler.go b/executor/compiler.go index d3750ba641199..8c310b004f310 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -55,7 +55,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } ret := &plannercore.PreprocessorReturn{} - if err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret)); err != nil { + pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode} + err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe)) + if err != nil { return nil, err } stmtNode = plannercore.TryAddExtraLimit(c.Ctx, stmtNode) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 8f82ce46ddda1..2fa0878d5552e 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -282,6 +282,10 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont preparedObj.Executor = nil // If the schema version has changed we need to preprocess it again, // if this time it failed, the real reason for the error is schema changed. + // Example: + // When running update in prepared statement's schema version distinguished from the one of execute statement + // We should reset the tableRefs in the prepared update statements, otherwise, the ast nodes still hold the old + // tableRefs columnInfo which will cause chaos in logic of trying point get plan. (should ban non-public column) ret := &PreprocessorReturn{InfoSchema: is} err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret)) if err != nil { diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 9cd9f7cac3f31..305f884ec3085 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -649,6 +649,16 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida return candidates } +func (ds *DataSource) isPointGetConvertableSchema() bool { + for _, col := range ds.Columns { + // Only handle tables that all columns are public. + if col.State != model.StatePublic { + return false + } + } + return true +} + // findBestTask implements the PhysicalPlan interface. // It will enumerate all the available indices and choose a plan with least cost. func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (t task, cntPlan int64, err error) { @@ -745,7 +755,7 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter p: dual, }, cntPlan, nil } - canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV + canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema() if canConvertPointGet && !path.IsIntHandlePath { // We simply do not build [batch] point get for prefix indexes. This can be optimized. canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex() diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 838a97bf0359d..7d2dd60520ab4 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4655,10 +4655,10 @@ type tblUpdateInfo struct { } // CheckUpdateList checks all related columns in updatable state. -func CheckUpdateList(assignFlags []int, updt *Update) error { +func CheckUpdateList(assignFlags []int, updt *Update, newTblID2Table map[int64]table.Table) error { updateFromOtherAlias := make(map[int64]tblUpdateInfo) for _, content := range updt.TblColPosInfos { - tbl := updt.tblID2Table[content.TblID] + tbl := newTblID2Table[content.TblID] flags := assignFlags[content.Start:content.End] var update, updatePK bool for i, col := range tbl.WritableCols() { diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index fff5467ead5cd..2efc7dedf5f3d 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -63,6 +63,13 @@ func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt { } } +// WithExecuteInfoSchemaUpdate return a PreprocessOpt to update the `Execute` infoSchema under some conditions. +func WithExecuteInfoSchemaUpdate(pe *PreprocessExecuteISUpdate) PreprocessOpt { + return func(p *preprocessor) { + p.PreprocessExecuteISUpdate = pe + } +} + // TryAddExtraLimit trys to add an extra limit for SELECT or UNION statement when sql_select_limit is set. func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode { if ctx.GetSessionVars().SelectLimit == math.MaxUint64 || ctx.GetSessionVars().InRestrictedSQL { @@ -143,6 +150,12 @@ type PreprocessorReturn struct { TxnScope string } +// PreprocessExecuteISUpdate is used to update information schema for special Execute statement in the preprocessor. +type PreprocessExecuteISUpdate struct { + ExecuteInfoSchemaUpdate func(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema + Node ast.Node +} + // preprocessor is an ast.Visitor that preprocess // ast Nodes parsed from parser. type preprocessor struct { @@ -157,6 +170,7 @@ type preprocessor struct { // values that may be returned *PreprocessorReturn + *PreprocessExecuteISUpdate err error } @@ -1596,9 +1610,17 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { // - session variable // - transaction context func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema { - if p.InfoSchema == nil { - p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema) + if p.InfoSchema != nil { + return p.InfoSchema + } + // `Execute` under some conditions need to see the latest information schema. + if p.PreprocessExecuteISUpdate != nil { + if newInfoSchema := p.ExecuteInfoSchemaUpdate(p.Node, p.ctx); newInfoSchema != nil { + p.InfoSchema = newInfoSchema + return p.InfoSchema + } } + p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema) return p.InfoSchema } diff --git a/planner/optimize.go b/planner/optimize.go index 64e3e8fd7ced6..05ba86f3bbce7 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -77,6 +77,24 @@ func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool { return ast.IsReadOnly(node) } +// GetExecuteForUpdateReadIS is used to check whether the statement is `execute` and target statement has a forUpdateRead flag. +// If so, we will return the latest information schema. +func GetExecuteForUpdateReadIS(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema { + if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt { + vars := sctx.GetSessionVars() + execID := execStmt.ExecID + if execStmt.Name != "" { + execID = vars.PreparedStmtNameToID[execStmt.Name] + } + if preparedPointer, ok := vars.PreparedStmts[execID]; ok { + if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead { + return domain.GetDomain(sctx).InfoSchema() + } + } + } + return nil +} + // Optimize does optimization and creates a Plan. // The node must be prepared first. func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (plannercore.Plan, types.NameSlice, error) { @@ -318,18 +336,6 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in } sctx.GetSessionVars().RewritePhaseInfo.DurationRewrite = time.Since(beginRewrite) - if execPlan, ok := p.(*plannercore.Execute); ok { - execID := execPlan.ExecID - if execPlan.Name != "" { - execID = sctx.GetSessionVars().PreparedStmtNameToID[execPlan.Name] - } - if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execID]; ok { - if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead { - is = domain.GetDomain(sctx).InfoSchema() - } - } - } - sctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo() activeRoles := sctx.GetSessionVars().ActiveRoles // Check privilege. Maybe it's better to move this to the Preprocess, but