Skip to content

Commit

Permalink
Merge branch 'master' into resource-control-privilege
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 17, 2023
2 parents 3d6b57b + bdc6f4b commit ea5aca5
Show file tree
Hide file tree
Showing 30 changed files with 639 additions and 132 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=",
version = "v2.0.5-0.20230110071533-f313ddf58d73",
sum = "h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=",
version = "v2.0.5-0.20230112062023-fe5b35c5f5dc",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
21 changes: 20 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,9 @@ type LimitExec struct {

// columnIdxsUsedByChild keep column indexes of child executor used for inline projection
columnIdxsUsedByChild []int

// Log the close time when opentracing is enabled.
span opentracing.Span
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -1470,13 +1473,29 @@ func (e *LimitExec) Open(ctx context.Context) error {
e.childResult = tryNewCacheChunk(e.children[0])
e.cursor = 0
e.meetFirstBatch = e.begin == 0
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
e.span = span
}
return nil
}

// Close implements the Executor Close interface.
func (e *LimitExec) Close() error {
start := time.Now()

e.childResult = nil
return e.baseExecutor.Close()
err := e.baseExecutor.Close()

elapsed := time.Since(start)
if elapsed > time.Millisecond {
logutil.BgLogger().Info("limit executor close takes a long time",
zap.Duration("elapsed", elapsed))
if e.span != nil {
span1 := e.span.Tracer().StartSpan("limitExec.Close", opentracing.ChildOf(e.span.Context()), opentracing.StartTime(start))
defer span1.Finish()
}
}
return err
}

func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk {
Expand Down
6 changes: 4 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5557,6 +5557,8 @@ func TestAdmin(t *testing.T) {
}))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("insert admin_test (c1) values (1),(2),(NULL)")
Expand Down Expand Up @@ -5681,7 +5683,7 @@ func TestAdmin(t *testing.T) {
// check that the result set has no duplication
defer wg.Done()
for i := 0; i < 10; i++ {
result := tk.MustQuery(`admin show ddl job queries 20`)
result := tk2.MustQuery(`admin show ddl job queries 20`)
rows := result.Rows()
rowIDs := make(map[string]struct{})
for _, row := range rows {
Expand Down Expand Up @@ -5712,7 +5714,7 @@ func TestAdmin(t *testing.T) {
// check that the result set has no duplication
defer wg2.Done()
for i := 0; i < 10; i++ {
result := tk.MustQuery(`admin show ddl job queries limit 3 offset 2`)
result := tk2.MustQuery(`admin show ddl job queries limit 3 offset 2`)
rows := result.Rows()
rowIDs := make(map[string]struct{})
for _, row := range rows {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73
github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=
github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI=
github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=
github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
3 changes: 3 additions & 0 deletions planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ type planDigester struct {

// NormalizeFlatPlan normalizes a FlatPhysicalPlan and generates plan digest.
func NormalizeFlatPlan(flat *FlatPhysicalPlan) (normalized string, digest *parser.Digest) {
if flat == nil {
return "", parser.NewDigest(nil)
}
selectPlan, selectPlanOffset := flat.Main.GetSelectPlan()
if len(selectPlan) == 0 || !selectPlan[0].IsPhysicalPlan {
return "", parser.NewDigest(nil)
Expand Down
12 changes: 6 additions & 6 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, true
}
if mustInt64orUint64 {
if expected := checkParamTypeInt64orUint64(v); !expected {
if expected, _ := CheckParamTypeInt64orUint64(v); !expected {
return 0, false, false
}
}
Expand Down Expand Up @@ -2054,19 +2054,19 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, false
}

// check param type for plan cache limit, only allow int64 and uint64 now
// CheckParamTypeInt64orUint64 check param type for plan cache limit, only allow int64 and uint64 now
// eg: set @a = 1;
func checkParamTypeInt64orUint64(param *driver.ParamMarkerExpr) bool {
func CheckParamTypeInt64orUint64(param *driver.ParamMarkerExpr) (bool, uint64) {
val := param.GetValue()
switch v := val.(type) {
case int64:
if v >= 0 {
return true
return true, uint64(v)
}
case uint64:
return true
return true, v
}
return false
return false, 0
}

func extractLimitCountOffset(ctx sessionctx.Context, limit *ast.Limit) (count uint64,
Expand Down
11 changes: 0 additions & 11 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ func postOptimize(ctx context.Context, sctx sessionctx.Context, plan PhysicalPla
plan = eliminateUnionScanAndLock(sctx, plan)
plan = enableParallelApply(sctx, plan)
handleFineGrainedShuffle(ctx, sctx, plan)
checkPlanCacheable(sctx, plan)
propagateProbeParents(plan, nil)
countStarRewrite(plan)
return plan, nil
Expand Down Expand Up @@ -966,16 +965,6 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
}
}

// checkPlanCacheable used to check whether a plan can be cached. Plans that
// meet the following characteristics cannot be cached:
// 1. Use the TiFlash engine.
// Todo: make more careful check here.
func checkPlanCacheable(sctx sessionctx.Context, plan PhysicalPlan) {
if sctx.GetSessionVars().StmtCtx.UseCache && useTiFlash(plan) {
sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: TiFlash plan is un-cacheable"))
}
}

// propagateProbeParents doesn't affect the execution plan, it only sets the probeParents field of a PhysicalPlan.
// It's for handling the inconsistency between row count in the statsInfo and the recorded actual row count. Please
// see comments in PhysicalPlan for details.
Expand Down
78 changes: 56 additions & 22 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,27 +158,29 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
}
}

paramNum, paramTypes := parseParamTypes(sctx, params)
paramTypes := parseParamTypes(sctx, params)

if stmtCtx.UseCache && stmtAst.CachedPlan != nil { // for point query plan
if plan, names, ok, err := getCachedPointPlan(stmtAst, sessVars, stmtCtx); ok {
return plan, names, err
}
}

limitCountAndOffset, paramErr := ExtractLimitFromAst(stmt.PreparedAst.Stmt, sctx)
if paramErr != nil {
return nil, nil, paramErr
}
if stmtCtx.UseCache { // for non-point plans
if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt,
paramTypes); err != nil || ok {
paramTypes, limitCountAndOffset); err != nil || ok {
return plan, names, err
}
}

return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramNum, paramTypes, bindSQL)
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramTypes, bindSQL, limitCountAndOffset)
}

// parseParamTypes get parameters' types in PREPARE statement
func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (paramNum int, paramTypes []*types.FieldType) {
paramNum = len(params)
func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (paramTypes []*types.FieldType) {
for _, param := range params {
if c, ok := param.(*expression.Constant); ok { // from binary protocol
paramTypes = append(paramTypes, c.GetType())
Expand Down Expand Up @@ -221,12 +223,12 @@ func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmt
}

func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType, limitParams []uint64) (Plan,
[]*types.FieldName, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx

candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes)
candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes, limitParams)
if !exist {
return nil, nil, false, nil
}
Expand Down Expand Up @@ -264,8 +266,9 @@ func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache

// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramNum int,
paramTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) {
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramTypes []*types.FieldType,
bindSQL string, limitParams []uint64) (Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand All @@ -282,10 +285,10 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
return nil, nil, err
}

// We only cache the tableDual plan when the number of parameters are zero.
if containTableDual(p) && paramNum > 0 {
stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: get a TableDual plan"))
}
// check whether this plan is cacheable.
checkPlanCacheability(sctx, p, len(paramTypes))

// put this plan into the plan cache.
if stmtCtx.UseCache {
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
Expand All @@ -296,16 +299,51 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes)
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes, limitParams)
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes, limitParams)
}
sessVars.FoundInPlanCache = false
return p, names, err
}

// checkPlanCacheability checks whether this plan is cacheable and set to skip plan cache if it's uncacheable.
func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int) {
stmtCtx := sctx.GetSessionVars().StmtCtx
var pp PhysicalPlan
switch x := p.(type) {
case *Insert:
pp = x.SelectPlan
case *Update:
pp = x.SelectPlan
case *Delete:
pp = x.SelectPlan
case PhysicalPlan:
pp = x
default:
stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: unexpected un-cacheable plan %v", p.ExplainID().String()))
return
}
if pp == nil { // simple DML statements
return
}

if useTiFlash(pp) {
stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: TiFlash plan is un-cacheable"))
return
}

// We only cache the tableDual plan when the number of parameters are zero.
if containTableDual(pp) && paramNum > 0 {
stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: get a TableDual plan"))
return
}

// TODO: plans accessing MVIndex are un-cacheable
}

// RebuildPlan4CachedPlan will rebuild this plan under current user parameters.
func RebuildPlan4CachedPlan(p Plan) error {
sc := p.SCtx().GetSessionVars().StmtCtx
Expand Down Expand Up @@ -675,17 +713,13 @@ func tryCachePointPlan(_ context.Context, sctx sessionctx.Context,
return err
}

func containTableDual(p Plan) bool {
func containTableDual(p PhysicalPlan) bool {
_, isTableDual := p.(*PhysicalTableDual)
if isTableDual {
return true
}
physicalPlan, ok := p.(PhysicalPlan)
if !ok {
return false
}
childContainTableDual := false
for _, child := range physicalPlan.Children() {
for _, child := range p.Children() {
childContainTableDual = childContainTableDual || containTableDual(child)
}
return childContainTableDual
Expand Down
Loading

0 comments on commit ea5aca5

Please sign in to comment.