Skip to content

Commit

Permalink
executor: support plan replayer capture remove task (pingcap#41258)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored and blacktear23 committed Feb 15, 2023
1 parent 7accbf7 commit 6915256
Show file tree
Hide file tree
Showing 11 changed files with 3,750 additions and 3,651 deletions.
10 changes: 5 additions & 5 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco
}

func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance))
if err != nil {
Expand All @@ -158,16 +158,16 @@ func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Co
}

func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sql", record.OriginSQL),
zap.Error(err))
// try insert record without original sql
_, err = exec.ExecuteInternal(ctx, fmt.Sprintf(
_, _, err = exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values ('%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.Token, instance))
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,17 @@ func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executo
}
return e
}
if v.Remove {
e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
CaptureInfo: &PlanReplayerCaptureInfo{
SQLDigest: v.SQLDigest,
PlanDigest: v.PlanDigest,
Remove: true,
},
}
return e
}

e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
17 changes: 16 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,29 @@ func TestPlanReplayer(t *testing.T) {
require.Len(t, rows, 1)
}

func TestPlanReplayerCaptureSEM(t *testing.T) {
originSEM := config.GetGlobalConfig().Security.EnableSEM
defer func() {
config.GetGlobalConfig().Security.EnableSEM = originSEM
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("plan replayer capture '123' '123';")
tk.MustExec("create table t(id int)")
tk.MustQuery("plan replayer dump explain select * from t")
tk.MustQuery("select count(*) from mysql.plan_replayer_status").Check(testkit.Rows("1"))
}

func TestPlanReplayerCapture(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("plan replayer capture '123' '123';")
tk.MustQuery("select sql_digest, plan_digest from mysql.plan_replayer_task;").Check(testkit.Rows("123 123"))
tk.MustGetErrMsg("plan replayer capture '123' '123';", "plan replayer capture task already exists")
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("plan replayer capture remove '123' '123'")
tk.MustQuery("select count(*) from mysql.plan_replayer_task;").Check(testkit.Rows("0"))
tk.MustExec("create table t(id int)")
tk.MustExec("prepare stmt from 'update t set id = ? where id = ? + 1';")
tk.MustExec("SET @number = 5;")
Expand Down
27 changes: 25 additions & 2 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type PlanReplayerExec struct {
type PlanReplayerCaptureInfo struct {
SQLDigest string
PlanDigest string
Remove bool
}

// PlanReplayerDumpInfo indicates dump info
Expand All @@ -74,6 +75,9 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}
if e.CaptureInfo != nil {
if e.CaptureInfo.Remove {
return e.removeCaptureTask(ctx)
}
return e.registerCaptureTask(ctx)
}
err := e.createFile()
Expand Down Expand Up @@ -102,6 +106,25 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *PlanReplayerExec) removeCaptureTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("delete from mysql.plan_replayer_task where sql_digest = '%s' and plan_digest = '%s'",
e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest))
if err != nil {
logutil.BgLogger().Warn("remove mysql.plan_replayer_status record failed",
zap.Error(err))
return err
}
err = domain.GetDomain(e.ctx).GetPlanReplayerHandle().CollectPlanReplayerTask()
if err != nil {
logutil.BgLogger().Warn("collect task failed", zap.Error(err))
}
logutil.BgLogger().Info("collect plan replayer task success")
e.endFlag = true
return nil
}

func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
exists, err := domain.CheckPlanReplayerTaskExists(ctx1, e.ctx, e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest)
Expand All @@ -111,8 +134,8 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {
if exists {
return errors.New("plan replayer capture task already exists")
}
exec := e.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx1, fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%s','%s')",
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
_, _, err = exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%s','%s')",
e.CaptureInfo.SQLDigest, e.CaptureInfo.PlanDigest))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
Expand Down
13 changes: 12 additions & 1 deletion parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ type PlanReplayerStmt struct {
Load bool

// Capture indicates 'plan replayer capture <sql_digest> <plan_digest>'
Capture bool
Capture bool
// Remove indicates `plan replayer capture remove <sql_digest> <plan_digest>
Remove bool

SQLDigest string
PlanDigest string

Expand Down Expand Up @@ -298,6 +301,14 @@ func (n *PlanReplayerStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteString(n.PlanDigest)
return nil
}
if n.Remove {
ctx.WriteKeyWord("PLAN REPLAYER CAPTURE REMOVE ")
ctx.WriteString(n.SQLDigest)
ctx.WriteKeyWord(" ")
ctx.WriteString(n.PlanDigest)
return nil
}

ctx.WriteKeyWord("PLAN REPLAYER DUMP EXPLAIN ")
if n.Analyze {
ctx.WriteKeyWord("ANALYZE ")
Expand Down
7,302 changes: 3,661 additions & 3,641 deletions parser/parser.go

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -14660,6 +14660,21 @@ PlanReplayerStmt:
Limit: nil,
}

$$ = x
}
| "PLAN" "REPLAYER" "CAPTURE" "REMOVE" stringLit stringLit
{
x := &ast.PlanReplayerStmt{
Stmt: nil,
Analyze: false,
Remove: true,
SQLDigest: $5,
PlanDigest: $6,
Where: nil,
OrderBy: nil,
Limit: nil,
}

$$ = x
}
%%
1 change: 1 addition & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6916,6 +6916,7 @@ func TestPlanReplayer(t *testing.T) {
{"PLAN REPLAYER DUMP EXPLAIN 'sql.txt'", true, "PLAN REPLAYER DUMP EXPLAIN 'sql.txt'"},
{"PLAN REPLAYER DUMP EXPLAIN ANALYZE 'sql.txt'", true, "PLAN REPLAYER DUMP EXPLAIN ANALYZE 'sql.txt'"},
{"PLAN REPLAYER CAPTURE '123' '123'", true, "PLAN REPLAYER CAPTURE '123' '123'"},
{"PLAN REPLAYER CAPTURE REMOVE '123' '123'", true, "PLAN REPLAYER CAPTURE REMOVE '123' '123'"},
}
RunTest(t, table, false)

Expand Down
1 change: 1 addition & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ type PlanReplayer struct {
File string

Capture bool
Remove bool
SQLDigest string
PlanDigest string
}
Expand Down
3 changes: 2 additions & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5197,7 +5197,8 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
}

func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan {
p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File, Capture: pc.Capture, SQLDigest: pc.SQLDigest, PlanDigest: pc.PlanDigest}
p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File,
Capture: pc.Capture, Remove: pc.Remove, SQLDigest: pc.SQLDigest, PlanDigest: pc.PlanDigest}
schema := newColumnsWithNames(1)
schema.Append(buildColumnWithName("", "File_token", mysql.TypeVarchar, 128))
p.SetSchema(schema.col2Schema())
Expand Down
1 change: 1 addition & 0 deletions privilege/privileges/privileges.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (p *UserPrivileges) RequestVerification(activeRoles []*auth.RoleIdentity, d
// See https://dev.mysql.com/doc/refman/5.7/en/information-schema.html
dbLowerName := strings.ToLower(db)
tblLowerName := strings.ToLower(table)

// If SEM is enabled and the user does not have the RESTRICTED_TABLES_ADMIN privilege
// There are some hard rules which overwrite system tables and schemas as read-only at most.
semEnabled := sem.IsEnabled()
Expand Down

0 comments on commit 6915256

Please sign in to comment.