Skip to content

Commit

Permalink
planner: check schema stale for plan cache when forUpdateRead (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored and SabaPing committed Mar 25, 2021
1 parent 21f9840 commit 04f9696
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 3 deletions.
1 change: 1 addition & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalized,
SQLDigest: digest,
ForUpdateRead: destBuilder.GetIsForUpdateRead(),
}
return vars.AddPreparedStmt(e.ID, preparedObj)
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,5 @@ type CachedPrepareStmt struct {
NormalizedPlan string
SQLDigest string
PlanDigest string
ForUpdateRead bool
}
10 changes: 9 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ func (e *Execute) setFoundInPlanCache(sctx sessionctx.Context, opt bool) error {
}

func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, preparedStmt *CachedPrepareStmt) error {
stmtCtx := sctx.GetSessionVars().StmtCtx
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
prepared := preparedStmt.PreparedAst
stmtCtx.UseCache = prepared.UseCache
var cacheKey kvcache.Key
Expand Down Expand Up @@ -397,6 +398,12 @@ REBUILD:
e.Plan = p
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache && !stmtCtx.OptimDependOnMutableConst {
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
cacheKey = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion)
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPSTMTPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, tps)
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
Expand Down Expand Up @@ -1348,5 +1355,6 @@ func IsPointUpdateByAutoCommit(ctx sessionctx.Context, p Plan) (bool, error) {
if _, isFastSel := updPlan.SelectPlan.(*PointGetPlan); isFastSel {
return true, nil
}

return false, nil
}
4 changes: 4 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
utilhint "github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/set"
Expand All @@ -39,6 +40,9 @@ var OptimizeAstNode func(ctx context.Context, sctx sessionctx.Context, node ast.
// AllowCartesianProduct means whether tidb allows cartesian join without equal conditions.
var AllowCartesianProduct = atomic.NewBool(true)

// IsReadOnly check whether the ast.Node is a read only statement.
var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool

const (
flagGcSubstitute uint64 = 1 << iota
flagPrunColumns
Expand Down
5 changes: 5 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ func (b *PlanBuilder) GetVisitInfo() []visitInfo {
return b.visitInfo
}

// GetIsForUpdateRead gets if the PlanBuilder use forUpdateRead
func (b *PlanBuilder) GetIsForUpdateRead() bool {
return b.isForUpdateRead
}

// GetDBTableInfo gets the accessed dbs and tables info.
func (b *PlanBuilder) GetDBTableInfo() []stmtctx.TableEntry {
var tables []stmtctx.TableEntry
Expand Down
14 changes: 14 additions & 0 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/planner/cascades"
"github.com/pingcap/tidb/planner/core"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -236,6 +237,18 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
sctx.GetSessionVars().RewritePhaseInfo.DurationRewrite = time.Since(beginRewrite)

if execPlan, ok := p.(*plannercore.Execute); ok {
execID := execPlan.ExecID
if execPlan.Name != "" {
execID = sctx.GetSessionVars().PreparedStmtNameToID[execPlan.Name]
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
is = domain.GetDomain(sctx).InfoSchema()
}
}
}

sctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo()
activeRoles := sctx.GetSessionVars().ActiveRoles
// Check privilege. Maybe it's better to move this to the Preprocess, but
Expand Down Expand Up @@ -556,4 +569,5 @@ func setFoundInBinding(sctx sessionctx.Context, opt bool) error {

func init() {
plannercore.OptimizeAstNode = Optimize
plannercore.IsReadOnly = IsReadOnly
}
66 changes: 66 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -2459,6 +2461,70 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) {
}
}

func (s *testPessimisticSuite) TestPlanCacheSchemaChange(c *C) {
orgEnable := plannercore.PreparedPlanCacheEnabled()
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
}()
plannercore.SetPreparedPlanCache(true)

tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
ctx := context.Background()

tk.MustExec("use test")
tk2.MustExec("use test")
tk3.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, v int, unique index iv (v), vv int)")
tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (4, 4, 4)")

tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1")
tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1")

//generate plan cache
tk.MustExec("prepare update_stmt from 'update t set vv = vv + 1 where v = ?'")
tk.MustExec("set @v = 1")
tk.MustExec("execute update_stmt using @v")

stmtID, _, _, err := tk2.Se.PrepareStmt("update t set vv = vv + 1 where v = ?")
c.Assert(err, IsNil)
_, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)

tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")

tk3.MustExec("alter table t drop index iv")
tk3.MustExec("update t set v = 3 where v = 2")
tk3.MustExec("update t set v = 5 where v = 4")

tk.MustExec("set @v = 2")
tk.MustExec("execute update_stmt using @v")
tk.CheckExecResult(0, 0)
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("set @v = 3")
tk.MustExec("execute update_stmt using @v")
tk.CheckExecResult(1, 0)
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

_, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(4)})
c.Assert(err, IsNil)
tk2.CheckExecResult(0, 0)
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
_, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(5)})
c.Assert(err, IsNil)
tk2.CheckExecResult(1, 0)
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("commit")
tk2.MustExec("commit")

tk.MustQuery("select * from t").Check(testkit.Rows("1 1 3", "2 3 3", "4 5 5"))
}

func (s *testPessimisticSuite) TestAsyncCommitCalTSFail(c *C) {
atomic.StoreUint64(&tikv.ManagedLockTTL, 5000)
defer func() {
Expand Down
16 changes: 14 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,12 @@ func (s *session) cachedPlanExec(ctx context.Context,
stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) {
prepared := prepareStmt.PreparedAst
// compile ExecStmt
is := infoschema.GetInfoSchema(s)
var is infoschema.InfoSchema
if prepareStmt.ForUpdateRead {
is = domain.GetDomain(s).InfoSchema()
} else {
is = infoschema.GetInfoSchema(s)
}
execAst := &ast.ExecuteStmt{ExecID: stmtID}
if err := executor.ResetContextOfStmt(s, execAst); err != nil {
return nil, err
Expand Down Expand Up @@ -1664,9 +1669,16 @@ func (s *session) cachedPlanExec(ctx context.Context,
s.PrepareTSFuture(ctx)
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
s.PrepareTSFuture(ctx)
}
resultSet, err = runStmt(ctx, s, stmt)
default:
err = errors.Errorf("invalid cached plan type %T", prepared.CachedPlan)
prepared.CachedPlan = nil
return nil, errors.Errorf("invalid cached plan type")
return nil, err
}
return resultSet, err
}
Expand Down

0 comments on commit 04f9696

Please sign in to comment.