Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: check schema stale for plan cache when forUpdateRead #22381

Merged
merged 16 commits into from
Mar 19, 2021
1 change: 1 addition & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalized,
SQLDigest: digest,
ForUpdateRead: destBuilder.GetIsForUpdateRead(),
}
return vars.AddPreparedStmt(e.ID, preparedObj)
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,5 @@ type CachedPrepareStmt struct {
NormalizedPlan string
SQLDigest string
PlanDigest string
ForUpdateRead bool
}
10 changes: 9 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ func (e *Execute) setFoundInPlanCache(sctx sessionctx.Context, opt bool) error {
}

func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, preparedStmt *CachedPrepareStmt) error {
stmtCtx := sctx.GetSessionVars().StmtCtx
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
prepared := preparedStmt.PreparedAst
stmtCtx.UseCache = prepared.UseCache
var cacheKey kvcache.Key
Expand Down Expand Up @@ -375,6 +376,12 @@ REBUILD:
e.Plan = p
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache && !stmtCtx.OptimDependOnMutableConst {
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
cacheKey = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion)
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When use binary protocol and the plan cache is not hit, a new cache will generate inside getPhysicalPlan function, in which use the following codes to exclude TiFlash engine for write statement.

tidb/planner/optimize.go

Lines 85 to 88 in 2364fec

delete(sessVars.IsolationReadEngines, kv.TiFlash)
defer func() {
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}()

After the plan is builing complete, the TiFlash engine is set back and the it'll still write a plan-cache key with TiFlash engine. As a result, we cache a write plan for TiFlash engine, and it'll never be used correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😮 @winoros PTAL

cached := NewPSTMTPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, tps)
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
Expand Down Expand Up @@ -1314,6 +1321,7 @@ func IsPointUpdateByAutoCommit(ctx sessionctx.Context, p Plan) (bool, error) {
if _, isFastSel := updPlan.SelectPlan.(*PointGetPlan); isFastSel {
return true, nil
}

return false, nil
}

Expand Down
4 changes: 4 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
utilhint "github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/set"
Expand All @@ -39,6 +40,9 @@ var OptimizeAstNode func(ctx context.Context, sctx sessionctx.Context, node ast.
// AllowCartesianProduct means whether tidb allows cartesian join without equal conditions.
var AllowCartesianProduct = atomic.NewBool(true)

// IsReadOnly check whether the ast.Node is a read only statement.
var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool

const (
flagGcSubstitute uint64 = 1 << iota
flagPrunColumns
Expand Down
5 changes: 5 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ func (b *PlanBuilder) GetVisitInfo() []visitInfo {
return b.visitInfo
}

// GetIsForUpdateRead gets if the PlanBuilder use forUpdateRead
func (b *PlanBuilder) GetIsForUpdateRead() bool {
return b.isForUpdateRead
}

// GetDBTableInfo gets the accessed dbs and tables info.
func (b *PlanBuilder) GetDBTableInfo() []stmtctx.TableEntry {
var tables []stmtctx.TableEntry
Expand Down
14 changes: 14 additions & 0 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/planner/cascades"
"github.com/pingcap/tidb/planner/core"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -236,6 +237,18 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
sctx.GetSessionVars().RewritePhaseInfo.DurationRewrite = time.Since(beginRewrite)

if execPlan, ok := p.(*plannercore.Execute); ok {
execID := execPlan.ExecID
if execPlan.Name != "" {
execID = sctx.GetSessionVars().PreparedStmtNameToID[execPlan.Name]
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
is = domain.GetDomain(sctx).InfoSchema()
}
}
}

sctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo()
activeRoles := sctx.GetSessionVars().ActiveRoles
// Check privilege. Maybe it's better to move this to the Preprocess, but
Expand Down Expand Up @@ -566,4 +579,5 @@ func setFoundInBinding(sctx sessionctx.Context, opt bool) error {

func init() {
plannercore.OptimizeAstNode = Optimize
plannercore.IsReadOnly = IsReadOnly
}
60 changes: 60 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb/types"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -2451,3 +2453,61 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) {
tk.MustQuery("select * from t1").Check(testkit.Rows("5 12 100"))
}
}

func (s *testPessimisticSuite) TestPlanCacheSchemaChange(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
ctx := context.Background()

tk.MustExec("use test")
tk2.MustExec("use test")
tk3.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, v int, unique index iv (v), vv int)")
tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (4, 4, 4)")

tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1")
tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1")

//generate plan cache
tk.MustExec("prepare update_stmt from 'update t set vv = vv + 1 where v = ?'")
tk.MustExec("set @v = 1")
tk.MustExec("execute update_stmt using @v")

stmtID, _, _, err := tk2.Se.PrepareStmt("update t set vv = vv + 1 where v = ?")
c.Assert(err, IsNil)
_, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)

tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")

tk3.MustExec("alter table t drop index iv")
tk3.MustExec("update t set v = 3 where v = 2")
tk3.MustExec("update t set v = 5 where v = 4")

tk.MustExec("set @v = 2")
tk.MustExec("execute update_stmt using @v")
tk.CheckExecResult(0, 0)
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("set @v = 3")
tk.MustExec("execute update_stmt using @v")
tk.CheckExecResult(1, 0)
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

_, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(4)})
c.Assert(err, IsNil)
tk2.CheckExecResult(0, 0)
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
_, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(5)})
c.Assert(err, IsNil)
tk2.CheckExecResult(1, 0)
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("commit")
tk2.MustExec("commit")

tk.MustQuery("select * from t").Check(testkit.Rows("1 1 3", "2 3 3", "4 5 5"))
}
16 changes: 14 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,12 @@ func (s *session) cachedPlanExec(ctx context.Context,
stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) {
prepared := prepareStmt.PreparedAst
// compile ExecStmt
is := infoschema.GetInfoSchema(s)
var is infoschema.InfoSchema
if prepareStmt.ForUpdateRead {
is = domain.GetDomain(s).InfoSchema()
} else {
is = infoschema.GetInfoSchema(s)
}
execAst := &ast.ExecuteStmt{ExecID: stmtID}
if err := executor.ResetContextOfStmt(s, execAst); err != nil {
return nil, err
Expand Down Expand Up @@ -1614,9 +1619,16 @@ func (s *session) cachedPlanExec(ctx context.Context,
s.PrepareTSFuture(ctx)
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
s.PrepareTSFuture(ctx)
}
resultSet, err = runStmt(ctx, s, stmt)
default:
err = errors.Errorf("invalid cached plan type %T", prepared.CachedPlan)
prepared.CachedPlan = nil
return nil, errors.Errorf("invalid cached plan type")
return nil, err
}
return resultSet, err
}
Expand Down