Skip to content

Commit

Permalink
planner: optimize the performance of PointPlan for Instance Plan Cache (
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Nov 11, 2024
1 parent fe6c9b7 commit e16613d
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/instanceplancache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go_test(
"others_test.go",
],
flaky = True,
shard_count = 32,
shard_count = 34,
deps = [
"//pkg/parser/auth",
"//pkg/testkit",
Expand Down
69 changes: 69 additions & 0 deletions pkg/planner/core/casetest/instanceplancache/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,75 @@ func TestInstancePlanCacheTableIndexScan(t *testing.T) {
wg.Wait()
}

func TestInstancePlanCacheConcurrencyPointPartitioning(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int, primary key(a)) partition by hash(a) partitions 10`)
tk.MustExec(`create table t2 (a int, primary key(a)) partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than (30),
partition p3 values less than (40),
partition p4 values less than (50),
partition p5 values less than (60),
partition p6 values less than (70),
partition p7 values less than (80),
partition p8 values less than (90),
partition p9 values less than (100))`)
tk.MustExec(`set global tidb_enable_instance_plan_cache=1`)
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t1 values (%v)", i))
tk.MustExec(fmt.Sprintf("insert into t2 values (%v)", i))
}

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tki := testkit.NewTestKit(t, store)
tki.MustExec(`use test`)
for k := 0; k < 100; k++ {
tName := fmt.Sprintf("t%v", rand.Intn(2)+1)
tki.MustExec(fmt.Sprintf("prepare st from 'select * from %v where a=?'", tName))
a := rand.Intn(100)
tki.MustExec("set @a = ?", a)
tki.MustQuery("execute st using @a").Check(testkit.Rows(fmt.Sprintf("%v", a)))
}
}()
}
wg.Wait()
}

func TestInstancePlanCacheConcurrencyPointMultipleColPKNoTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t (a int, b int, primary key(a, b))`)
tk.MustExec(`set global tidb_enable_instance_plan_cache=1`)
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i))
}

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tki := testkit.NewTestKit(t, store)
tki.MustExec(`use test`)
tki.MustExec(`prepare st from 'select * from t where a=? and b=?'`)
for k := 0; k < 100; k++ {
a := rand.Intn(100)
tki.MustExec("set @a = ?, @b = ?", a, a)
tki.MustQuery("execute st using @a, @b").Check(testkit.Rows(fmt.Sprintf("%v %v", a, a)))
}
}()
}
wg.Wait()
}

func TestInstancePlanCacheConcurrencyPointNoTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
50 changes: 34 additions & 16 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,
if stmtCtx.UseCache() {
plan, outputCols, stmtHints, hit := lookupPlanCache(ctx, sctx, cacheKey, paramTypes)
skipPrivCheck := stmt.PointGet.Executor != nil // this case is specially handled
if hit && instancePlanCacheEnabled(ctx) {
plan, hit = clonePlanForInstancePlanCache(ctx, sctx, stmt, plan)
}
if hit {
if plan, ok, err := adjustCachedPlan(ctx, sctx, plan, stmtHints, isNonPrepared, skipPrivCheck, binding, is, stmt); err != nil || ok {
return plan, outputCols, err
Expand All @@ -236,6 +239,29 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, paramTypes)
}

func clonePlanForInstancePlanCache(ctx context.Context, sctx sessionctx.Context,
stmt *PlanCacheStmt, plan base.Plan) (clonedPlan base.Plan, ok bool) {
// TODO: add metrics to record the time cost of this clone operation.
fastPoint := stmt.PointGet.Executor != nil // this case is specially handled
pointPlan, isPoint := plan.(*PointGetPlan)
if fastPoint && isPoint { // special optimization for fast point plans
if stmt.PointGet.FastPlan == nil {
stmt.PointGet.FastPlan = new(PointGetPlan)
}
FastClonePointGetForPlanCache(sctx.GetPlanCtx(), pointPlan, stmt.PointGet.FastPlan)
clonedPlan = stmt.PointGet.FastPlan
} else {
clonedPlan, ok = plan.CloneForPlanCache(sctx.GetPlanCtx())
if !ok { // clone the value to solve concurrency problem
return nil, false
}
}
if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil {
ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(plan, clonedPlan)
}
return clonedPlan, true
}

func instancePlanCacheEnabled(ctx context.Context) bool {
if intest.InTest && ctx.Value(PlanCacheKeyEnableInstancePlanCache{}) != nil {
return true
Expand All @@ -252,25 +278,17 @@ func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey stri
core_metrics.GetPlanCacheLookupDuration(useInstanceCache).Observe(time.Since(begin).Seconds())
}
}(time.Now())
var v any
if useInstanceCache {
if v, hit := domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes); hit {
pcv := v.(*PlanCacheValue)
clonedPlan, ok := pcv.Plan.CloneForPlanCache(sctx.GetPlanCtx())
if !ok { // clone the value to solve concurrency problem
return nil, nil, nil, false
}
if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil {
ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(pcv.Plan, clonedPlan)
}
return clonedPlan, pcv.OutputColumns, pcv.stmtHints, true
}
v, hit = domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes)
} else {
if v, hit := sctx.GetSessionPlanCache().Get(cacheKey, paramTypes); hit {
pcv := v.(*PlanCacheValue)
return pcv.Plan, pcv.OutputColumns, pcv.stmtHints, true
}
v, hit = sctx.GetSessionPlanCache().Get(cacheKey, paramTypes)
}
if !hit {
return nil, nil, nil, false
}
return nil, nil, nil, false
pcv := v.(*PlanCacheValue)
return pcv.Plan, pcv.OutputColumns, pcv.stmtHints, true
}

func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context,
Expand Down
87 changes: 87 additions & 0 deletions pkg/planner/core/plan_cache_rebuild_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ import (
"context"
"fmt"
"math/rand"
"os"
"reflect"
"strings"
"testing"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/planner"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/resolve"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -441,3 +445,86 @@ type visit struct {
a2 unsafe.Pointer
typ reflect.Type
}

func TestFastPointGetClone(t *testing.T) {
codeFile := "plan_clone_utils.go"
codeData, err := os.ReadFile(codeFile)
require.NoError(t, err)
codeLines := strings.Split(string(codeData), "\n")
beginPrefix := `func FastClonePointGetForPlanCache(`
endPrefix := `}`
beginIdx, endIdx := -1, -1
for i, line := range codeLines {
if strings.HasPrefix(line, beginPrefix) {
beginIdx = i
}
if beginIdx != -1 && strings.HasPrefix(line, endPrefix) {
endIdx = i
break
}
}
cloneFuncCode := strings.Join(codeLines[beginIdx:endIdx+1], "\n")
fieldNoNeedToClone := map[string]struct{}{
"cost": {},
"planCostInit": {},
"planCost": {},
"planCostVer2": {},
"accessCols": {},
}

pointPlan := reflect.TypeOf(core.PointGetPlan{})
for i := 0; i < pointPlan.NumField(); i++ {
fieldName := pointPlan.Field(i).Name
if _, ok := fieldNoNeedToClone[fieldName]; ok {
continue
}
assignFieldCode := fmt.Sprintf("%v =", fieldName)
if !strings.Contains(cloneFuncCode, assignFieldCode) {
errMsg := fmt.Sprintf("field %v might not be set in FastClonePointGetForPlanCache correctly", fieldName)
t.Fatal(errMsg)
}
}
}

func BenchmarkPointGetCloneFast(b *testing.B) {
store, domain := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t (a int, b int, primary key(a, b))`)

p := parser.New()
stmt, err := p.ParseOneStmt("select a, b from t where a=1 and b=1", "", "")
require.NoError(b, err)
nodeW := resolve.NewNodeW(stmt)
plan, _, err := planner.Optimize(context.TODO(), tk.Session(), nodeW, domain.InfoSchema())
require.NoError(b, err)

b.ResetTimer()
src := plan.(*core.PointGetPlan)
dst := new(core.PointGetPlan)
sctx := tk.Session().GetPlanCtx()
for i := 0; i < b.N; i++ {
core.FastClonePointGetForPlanCache(sctx, src, dst)
}
}

func BenchmarkPointGetClone(b *testing.B) {
store, domain := testkit.CreateMockStoreAndDomain(b)
tk := testkit.NewTestKit(b, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t (a int, b int, primary key(a, b))`)

p := parser.New()
stmt, err := p.ParseOneStmt("select a, b from t where a=1 and b=1", "", "")
require.NoError(b, err)
nodeW := resolve.NewNodeW(stmt)
plan, _, err := planner.Optimize(context.TODO(), tk.Session(), nodeW, domain.InfoSchema())
require.NoError(b, err)

b.ResetTimer()
src := plan.(*core.PointGetPlan)
sctx := tk.Session().GetPlanCtx()
for i := 0; i < b.N; i++ {
src.CloneForPlanCache(sctx)
}
}
5 changes: 5 additions & 0 deletions pkg/planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ type PointGetExecutorCache struct {
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor any

// FastPlan is only used for instance plan cache.
// To ensure thread-safe, we have to clone each plan before reusing if using instance plan cache.
// To reduce the memory allocation and increase performance, we cache the FastPlan here.
FastPlan *PointGetPlan
}

// PlanCacheStmt store prepared ast from PrepareExec and other related fields
Expand Down
49 changes: 49 additions & 0 deletions pkg/planner/core/plan_clone_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package core
import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/types"
)

func clonePhysicalPlansForPlanCache(newCtx base.PlanContext, plans []base.PhysicalPlan) ([]base.PhysicalPlan, bool) {
Expand Down Expand Up @@ -172,3 +173,51 @@ func cloneConstant2DForPlanCache(constants [][]*expression.Constant) [][]*expres
}
return cloned
}

// FastClonePointGetForPlanCache is a fast path to clone a PointGetPlan for plan cache.
func FastClonePointGetForPlanCache(newCtx base.PlanContext, src, dst *PointGetPlan) *PointGetPlan {
if dst == nil {
dst = new(PointGetPlan)
}
dst.Plan = src.Plan
dst.Plan.SetSCtx(newCtx)
dst.probeParents = src.probeParents
dst.PartitionNames = src.PartitionNames
dst.dbName = src.dbName
dst.schema = src.schema
dst.TblInfo = src.TblInfo
dst.IndexInfo = src.IndexInfo
dst.PartitionIdx = nil // partition prune will be triggered during execution phase
dst.Handle = nil // handle will be set during rebuild phase
if src.HandleConstant == nil {
dst.HandleConstant = nil
} else {
if src.HandleConstant.SafeToShareAcrossSession() {
dst.HandleConstant = src.HandleConstant
} else {
dst.HandleConstant = src.HandleConstant.Clone().(*expression.Constant)
}
}
dst.handleFieldType = src.handleFieldType
dst.HandleColOffset = src.HandleColOffset
if len(dst.IndexValues) < len(src.IndexValues) { // actually set during rebuild phase
dst.IndexValues = make([]types.Datum, len(src.IndexValues))
} else {
dst.IndexValues = dst.IndexValues[:len(src.IndexValues)]
}
dst.IndexConstants = cloneConstantsForPlanCache(src.IndexConstants, dst.IndexConstants)
dst.ColsFieldType = src.ColsFieldType
dst.IdxCols = cloneColumnsForPlanCache(src.IdxCols, dst.IdxCols)
dst.IdxColLens = src.IdxColLens
dst.AccessConditions = cloneExpressionsForPlanCache(src.AccessConditions, dst.AccessConditions)
dst.UnsignedHandle = src.UnsignedHandle
dst.IsTableDual = src.IsTableDual
dst.Lock = src.Lock
dst.outputNames = src.outputNames
dst.LockWaitTime = src.LockWaitTime
dst.Columns = src.Columns

// remaining fields are unnecessary to clone:
// cost, planCostInit, planCost, planCostVer2, accessCols
return dst
}
4 changes: 3 additions & 1 deletion pkg/planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@ type PointGetPlan struct {
outputNames []*types.FieldName `plan-cache-clone:"shallow"`
LockWaitTime int64
Columns []*model.ColumnInfo `plan-cache-clone:"shallow"`
cost float64

// required by cost model
cost float64
planCostInit bool
planCost float64
planCostVer2 costusage.CostVer2 `plan-cache-clone:"shallow"`
// accessCols represents actual columns the PointGet will access, which are used to calculate row-size
accessCols []*expression.Column

// NOTE: please update FastClonePointGetForPlanCache accordingly if you add new fields here.
}

// GetEstRowCountForDisplay implements PhysicalPlan interface.
Expand Down

0 comments on commit e16613d

Please sign in to comment.