Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: make plan replayer capture support prepared stmt #40167

Merged
merged 11 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//meta",
"//metrics",
"//owner",
"//parser",
"//parser/ast",
"//parser/model",
"//parser/mysql",
Expand All @@ -45,7 +46,6 @@ go_library(
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
"//statistics/handle",
"//telemetry",
"//ttl/ttlworker",
Expand Down
47 changes: 18 additions & 29 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand Down Expand Up @@ -101,7 +100,7 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName))
continue
}
isPlanReplayer := parseType(fileName) == "replayer"
isPlanReplayer := strings.Contains(fileName, "replayer")
if !createTime.After(gcTime) {
err := os.Remove(filepath.Join(path, f.Name()))
if err != nil {
Expand Down Expand Up @@ -410,7 +409,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsCapture)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -421,29 +420,18 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(w.sctx).InfoSchema()
if task.IsCapture && !task.IsContinuesCapture {
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
jsStats[tblID] = r
if task.InExecute && len(task.NormalizedSQL) > 0 {
p := parser.New()
stmts, _, err := p.ParseSQL(task.NormalizedSQL)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-capture] parse normalized sql failed",
zap.String("sql", task.NormalizedSQL),
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
task.JSONTblStats = jsStats
task.ExecStmts = stmts
}
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
Expand Down Expand Up @@ -538,15 +526,16 @@ type PlanReplayerDumpTask struct {
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
InExecute bool
NormalizedSQL string

// variables used to dump the plan
StartTS uint64
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
SessionVars *variable.SessionVars
JSONTblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool

Expand Down
19 changes: 7 additions & 12 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
return err
}

// For continues capture, we don't dump stats
if !task.IsContinuesCapture {
// For capture task, we don't dump stats
if !task.IsCapture {
// Dump stats
if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil {
if err = dumpStats(zw, pairs, do); err != nil {
return err
}
}
Expand Down Expand Up @@ -415,12 +415,12 @@ func dumpSchemaMeta(zw *zip.Writer, tables map[tableNamePair]struct{}) error {
return nil
}

func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, tblJSONStats map[int64]*handle.JSONTable, do *Domain) error {
func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *Domain) error {
for pair := range pairs {
if pair.IsView {
continue
}
jsonTbl, err := getStatsForTable(do, tblJSONStats, pair)
jsonTbl, err := getStatsForTable(do, pair)
if err != nil {
return err
}
Expand Down Expand Up @@ -653,19 +653,14 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context,
return r, nil
}

func getStatsForTable(do *Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) {
func getStatsForTable(do *Domain, pair tableNamePair) (*handle.JSONTable, error) {
is := do.InfoSchema()
h := do.StatsHandle()
tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName))
if err != nil {
return nil, err
}
js, ok := tblJSONStats[tbl.Meta().ID]
if ok && js != nil {
return js, nil
}
js, err = h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
return js, err
return h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
}

func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error {
Expand Down
97 changes: 97 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -58,6 +59,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -1365,6 +1367,18 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}

sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
Expand Down Expand Up @@ -1953,3 +1967,86 @@ func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model.
}
return r
}

// only allow select/delete/update/insert/execute stmt captured by continues capture
func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool {
switch stmtNode.(type) {
case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt:
return true
default:
return false
}
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}

func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
if _, ok := stmtNode.(*ast.ExecuteStmt); ok {
nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest()
dumpTask.InExecute = true
dumpTask.NormalizedSQL = nsql
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu

// `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts
// Please notice that in RC isolation, the above two ts are the same
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) {
if b.forDataReaderBuilder {
return b.dataReaderTS, nil
}
Expand Down
Loading