Skip to content

Commit

Permalink
Revert "planner: simplify plan cache for fast point get (pingcap#53135)"
Browse files Browse the repository at this point in the history
This reverts commit 58469bb.
  • Loading branch information
qw4990 committed May 29, 2024
1 parent 3978253 commit e169d6f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
a.PsStmt.PointGet.Executor = nil
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.Plan.(*plannercore.PointGetPlan)
pointGetPlan := a.PsStmt.PointGet.Plan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.PointGet.Executor = exec
executor = exec
Expand Down
20 changes: 15 additions & 5 deletions pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
sessVars := c.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
// handle the execute statement
var preparedObj *plannercore.PlanCacheStmt
var (
pointGetPlanShortPathOK bool
preparedObj *plannercore.PlanCacheStmt
)

if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
if preparedObj, err = plannercore.GetPreparedStmt(execStmt, sessVars); err != nil {
return nil, err
}
pointGetPlanShortPathOK = plannercore.IsPointGetPlanShortPathOK(c.Ctx, is, preparedObj)
}
// Build the final physical plan.
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, is)
Expand Down Expand Up @@ -126,10 +130,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
OutputNames: names,
}
// Use cached plan if possible.
if preparedObj != nil && plannercore.IsSafeToReusePointGetExecutor(c.Ctx, is, preparedObj) {
if exec, isExec := finalPlan.(*plannercore.Execute); isExec {
if pointPlan, isPointPlan := exec.Plan.(*plannercore.PointGetPlan); isPointPlan {
stmt.PsStmt, stmt.Plan = preparedObj, pointPlan // notify to re-use the cached plan
if pointGetPlanShortPathOK {
if ep, ok := stmt.Plan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(stmt.Plan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PointGet.Plan = nil
}
}
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,17 +787,27 @@ func checkPreparedPriv(sctx sessionctx.Context, stmt *PlanCacheStmt, is infosche
return err
}

// IsSafeToReusePointGetExecutor checks whether this is a PointGet Plan and safe to reuse its executor.
func IsSafeToReusePointGetExecutor(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) bool {
if staleread.IsStmtStaleness(sctx) {
// IsPointGetPlanShortPathOK check if we can execute using plan cached in prepared structure
// Be careful with the short path, current precondition is ths cached plan satisfying
// IsPointGetWithPKOrUniqueKeyByAutoCommit
func IsPointGetPlanShortPathOK(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) bool {
if stmt.PointGet.Plan == nil || staleread.IsStmtStaleness(sctx) {
return false
}
// check auto commit
if !IsAutoCommitTxn(sctx.GetSessionVars()) {
return false
}
if stmt.SchemaVersion != is.SchemaMetaVersion() {
stmt.PointGet.Plan = nil
stmt.PointGet.ColumnInfos = nil
return false
}
// only support simple PointGet Plan now
switch stmt.PointGet.Plan.(type) {
case *PointGetPlan:
return true
default:
return false
}
return true
}
2 changes: 1 addition & 1 deletion pkg/sessiontxn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ go_test(
"txn_rc_tso_optimize_test.go",
],
flaky = True,
shard_count = 25,
shard_count = 24,
deps = [
":sessiontxn",
"//pkg/domain",
Expand Down
43 changes: 3 additions & 40 deletions pkg/sessiontxn/txn_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,12 @@ func TestTxnContextForPrepareExecute(t *testing.T) {
doWithCheckPath(t, se, normalPathRecords, func() {
tk.MustExec("prepare s from 'select * from t1 where id=1'")
})
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
doWithCheckPath(t, se, normalPathRecords, func() {
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
})

// Test ExecutePreparedStmt
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
doWithCheckPath(t, se, normalPathRecords, func() {
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
Expand Down Expand Up @@ -549,42 +549,6 @@ func TestTxnContextForPrepareExecute(t *testing.T) {
tk.MustExec("rollback")
}

func TestStaleReadInPrepare(t *testing.T) {
store, _ := setupTxnContextTest(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
se := tk.Session()

tk.MustExec(`create table tt (id int primary key, v int)`)
tk.MustExec(`insert into tt values(1, 10)`)
tk.MustExec("do sleep(0.1)")
tk.MustExec("set @a=now(6)")
tk.MustExec("do sleep(0.1)")

st, _, _, err := se.PrepareStmt("select v from tt where id=1")
require.NoError(t, err)

tk.MustExec(`update tt set v=11 where id=1`)
rs, err := se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("11"))

tk.MustExec("set @@tx_read_ts=@a")
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("10"))

tk.MustExec("update tt set v=12 where id=1")
tk.MustExec("set @@tx_read_ts=''")
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("12"))
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("12"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
}

func TestTxnContextForStaleReadInPrepare(t *testing.T) {
store, _ := setupTxnContextTest(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -668,8 +632,7 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) {
tk.MustExec("do sleep(0.1)")
tk.MustExec("update t1 set v=v+1 where id=1")
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
// stale-read is not used since `tx_read_ts` is empty, so the plan cache should be used in this case.
doWithCheckPath(t, se, normalPathRecords, func() {
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 12"))
Expand Down

0 comments on commit e169d6f

Please sign in to comment.