Skip to content

Commit

Permalink
Merge branch 'master' into fix-40620
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 17, 2023
2 parents e9d7c8e + 45e85d9 commit 8b51afd
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 23 deletions.
1 change: 1 addition & 0 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
cop.tablePlan = ts
cop.idxMergePartPlans = scans
cop.idxMergeIsIntersection = path.IndexMergeIsIntersection
cop.idxMergeAccessMVIndex = path.IndexMergeAccessMVIndex
if remainingFilters != nil {
cop.rootTaskConds = remainingFilters
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/indexmerge_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func (ds *DataSource) generateIndexMerge4MVIndex(normalPathCnt int, filters []ex

// buildPartialPathUp4MVIndex builds these partial paths up to a complete index merge path.
func (ds *DataSource) buildPartialPathUp4MVIndex(partialPaths []*util.AccessPath, isIntersection bool, remainingFilters []expression.Expression) *util.AccessPath {
indexMergePath := &util.AccessPath{PartialIndexPaths: partialPaths}
indexMergePath := &util.AccessPath{PartialIndexPaths: partialPaths, IndexMergeAccessMVIndex: true}
indexMergePath.IndexMergeIsIntersection = isIntersection
indexMergePath.TableFilters = remainingFilters

Expand Down
13 changes: 13 additions & 0 deletions planner/core/indexmerge_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,16 @@ index i_int((cast(j->'$.int' as signed array))))`)
result.Check(testkit.Rows(output[i].Plan...))
}
}

func TestMVIndexIndexMergePlanCache(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`create table t(j json, index kj((cast(j as signed array))))`)

tk.MustExec("prepare st from 'select /*+ use_index_merge(t, kj) */ * from t where (1 member of (j))'")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query accesses generated columns is un-cacheable"))
tk.MustExec("execute st")
tk.MustExec("execute st")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}
20 changes: 0 additions & 20 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,26 +1001,6 @@ func propagateProbeParents(plan PhysicalPlan, probeParents []PhysicalPlan) {
}
}

// useTiFlash used to check whether the plan use the TiFlash engine.
func useTiFlash(p PhysicalPlan) bool {
switch x := p.(type) {
case *PhysicalTableReader:
switch x.StoreType {
case kv.TiFlash:
return true
default:
return false
}
default:
if len(p.Children()) > 0 {
for _, plan := range p.Children() {
return useTiFlash(plan)
}
}
}
return false
}

func enableParallelApply(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPlan {
if !sctx.GetSessionVars().EnableParallelApply {
return plan
Expand Down
2 changes: 2 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ type PhysicalIndexMergeReader struct {
// IsIntersectionType means whether it's intersection type or union type.
// Intersection type is for expressions connected by `AND` and union type is for `OR`.
IsIntersectionType bool
// AccessMVIndex indicates whether this IndexMergeReader access a MVIndex.
AccessMVIndex bool

// PartialPlans flats the partialPlans to construct executor pb.
PartialPlans [][]PhysicalPlan
Expand Down
44 changes: 42 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
}

// check whether this plan is cacheable.
checkPlanCacheability(sctx, p, len(paramTypes))
if stmtCtx.UseCache {
checkPlanCacheability(sctx, p, len(paramTypes))
}

// put this plan into the plan cache.
if stmtCtx.UseCache {
Expand Down Expand Up @@ -341,7 +343,10 @@ func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int) {
return
}

// TODO: plans accessing MVIndex are un-cacheable
if accessMVIndexWithIndexMerge(pp) {
stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: the plan with IndexMerge accessing Multi-Valued Index is un-cacheable"))
return
}
}

// RebuildPlan4CachedPlan will rebuild this plan under current user parameters.
Expand Down Expand Up @@ -725,6 +730,41 @@ func containTableDual(p PhysicalPlan) bool {
return childContainTableDual
}

func accessMVIndexWithIndexMerge(p PhysicalPlan) bool {
if idxMerge, ok := p.(*PhysicalIndexMergeReader); ok {
if idxMerge.AccessMVIndex {
return true
}
}

for _, c := range p.Children() {
if accessMVIndexWithIndexMerge(c) {
return true
}
}
return false
}

// useTiFlash used to check whether the plan use the TiFlash engine.
func useTiFlash(p PhysicalPlan) bool {
switch x := p.(type) {
case *PhysicalTableReader:
switch x.StoreType {
case kv.TiFlash:
return true
default:
return false
}
default:
if len(p.Children()) > 0 {
for _, plan := range p.Children() {
return useTiFlash(plan)
}
}
}
return false
}

// GetBindSQL4PlanCache used to get the bindSQL for plan cache to build the plan cache key.
func GetBindSQL4PlanCache(sctx sessionctx.Context, stmt *PlanCacheStmt) (string, bool) {
useBinding := sctx.GetSessionVars().UsePlanBaselines
Expand Down
2 changes: 2 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type copTask struct {

idxMergePartPlans []PhysicalPlan
idxMergeIsIntersection bool
idxMergeAccessMVIndex bool

// rootTaskConds stores select conditions containing virtual columns.
// These conditions can't push to TiKV, so we have to add a selection for rootTask
Expand Down Expand Up @@ -688,6 +689,7 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
partialPlans: t.idxMergePartPlans,
tablePlan: t.tablePlan,
IsIntersectionType: t.idxMergeIsIntersection,
AccessMVIndex: t.idxMergeAccessMVIndex,
}.Init(ctx, t.idxMergePartPlans[0].SelectBlockOffset())
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
Expand Down
2 changes: 2 additions & 0 deletions planner/util/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type AccessPath struct {
// It's only valid for a IndexMerge path.
// Intersection type is for expressions connected by `AND` and union type is for `OR`.
IndexMergeIsIntersection bool
// IndexMergeAccessMVIndex indicates whether this IndexMerge path accesses a MVIndex.
IndexMergeAccessMVIndex bool

StoreType kv.StoreType

Expand Down
1 change: 1 addition & 0 deletions resourcemanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//resourcemanager/util",
"//util",
"//util/cpu",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
Expand Down
11 changes: 11 additions & 0 deletions resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package resourcemanager
import (
"time"

"github.com/google/uuid"
"github.com/pingcap/tidb/resourcemanager/scheduler"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
Expand All @@ -26,6 +27,11 @@ import (
// GlobalResourceManager is a global resource manager
var GlobalResourceManager = NewResourceManger()

// RandomName is to get a random name for register pool. It is just for test.
func RandomName() string {
return uuid.New().String()
}

// ResourceManager is a resource manager
type ResourceManager struct {
poolMap *util.ShardPoolMap
Expand Down Expand Up @@ -85,3 +91,8 @@ func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) er
func (r *ResourceManager) Unregister(name string) {
r.poolMap.Del(name)
}

// Reset is to Reset resource manager. it is just for test.
func (r *ResourceManager) Reset() {
r.poolMap = util.NewShardPoolMap()
}
1 change: 1 addition & 0 deletions testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//parser/ast",
"//parser/terror",
"//planner/core",
"//resourcemanager",
"//session",
"//session/txninfo",
"//sessionctx/variable",
Expand Down
2 changes: 2 additions & 0 deletions testkit/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/ddl/schematracker"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/store/mockstore"
Expand Down Expand Up @@ -91,6 +92,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma
err := store.Close()
require.NoError(t, err)
view.Stop()
resourcemanager.GlobalResourceManager.Reset()
})
return dom
}
Expand Down

0 comments on commit 8b51afd

Please sign in to comment.