Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: optimize executor runtime stats by avoid unnecessary clone #54004

Merged
merged 6 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func TestSelectResultRuntimeStats(t *testing.T) {
s1.procKeys.Add(100)
s1.procKeys.Add(200)

s2 := *s1
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
s2 := s1.Clone()
stmtStats.RegisterStats(1, s1.Clone())
stmtStats.RegisterStats(1, s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, open:0s, close:0s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
require.Equal(t, expect, stats.String())
Expand All @@ -134,7 +134,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
s1.reqStat.RecordRPCErrorStats("server_is_busy")
s1.reqStat.RecordRPCErrorStats("server_is_busy")
stmtStats.RegisterStats(2, s1)
stmtStats.RegisterStats(2, s1.Clone())
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:1, total_time:1s}, rpc_errors:{server_is_busy:2}}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,15 @@ func (e *BatchPointGetExec) Close() error {
if e.RuntimeStats() != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
if e.indexUsageReporter != nil {
if e.indexUsageReporter != nil && e.stats != nil {
kvReqTotal := e.stats.GetCmdRPCCount(tikvrpc.CmdBatchGet)
// We cannot distinguish how many rows are coming from each partition. Here, we calculate all index usages
// percentage according to the row counts for the whole table.
rows := e.RuntimeStats().GetActRows()
if e.idxInfo != nil {
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.tblInfo.ID, e.idxInfo.ID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.tblInfo.ID, e.idxInfo.ID, kvReqTotal, rows)
} else {
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, e.tblInfo.ID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, e.tblInfo.ID, kvReqTotal, rows)
}
}
e.inited = 0
Expand Down
14 changes: 4 additions & 10 deletions pkg/executor/internal/exec/indexusage.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,30 +89,24 @@ func (e *IndexUsageReporter) ReportCopIndexUsage(tableID int64, physicalTableID

// ReportPointGetIndexUsageForHandle wraps around `ReportPointGetIndexUsage` to get the `indexID` automatically
// from the `table.Table` if the table has a clustered index or integer primary key.
func (e *IndexUsageReporter) ReportPointGetIndexUsageForHandle(tblInfo *model.TableInfo, physicalTableID int64, planID int, kvRequestTotal int64) {
func (e *IndexUsageReporter) ReportPointGetIndexUsageForHandle(tblInfo *model.TableInfo, physicalTableID int64, kvRequestTotal, rows int64) {
idxID, ok := getClusterIndexID(tblInfo)
if !ok {
return
}

e.ReportPointGetIndexUsage(tblInfo.ID, physicalTableID, idxID, planID, kvRequestTotal)
e.ReportPointGetIndexUsage(tblInfo.ID, physicalTableID, idxID, kvRequestTotal, rows)
}

// ReportPointGetIndexUsage reports the index usage of a point get or batch point get
func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTableID int64, indexID int64, planID int, kvRequestTotal int64) {
func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTableID int64, indexID int64, kvRequestTotal, rows int64) {
tableRowCount, ok := e.getTableRowCount(physicalTableID)
if !ok {
// skip if the table is empty or the stats is not valid
return
}

basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID, false)
if basic == nil {
return
}
accessRows := basic.GetActRows()

sample := indexusage.NewSample(0, uint64(kvRequestTotal), uint64(accessRows), uint64(tableRowCount))
sample := indexusage.NewSample(0, uint64(kvRequestTotal), uint64(rows), uint64(tableRowCount))
e.reporter.Update(tableID, indexID, sample)
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/executor/internal/exec/indexusage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ func TestIndexUsageReporter(t *testing.T) {
runtimeStatsColl := sc.RuntimeStatsColl

// For PointGet and BatchPointGet
planID := 3
runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, 1, 2024)

require.Eventually(t, func() bool {
tk.Session().ReportUsageStats()
Expand All @@ -63,7 +61,7 @@ func TestIndexUsageReporter(t *testing.T) {
}, time.Second*5, time.Millisecond)

// For Index Scan
planID = 4
planID := 4
rows := uint64(2024)
zero := uint64(0)
executorID := "test-executor"
Expand All @@ -87,9 +85,7 @@ func TestIndexUsageReporter(t *testing.T) {
Version: statistics.PseudoVersion,
RealtimeCount: 100,
})
planID = 4
runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, 1, 2024)

require.Eventually(t, func() bool {
tk.Session().ReportUsageStats()
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,11 @@ func (e *PointGetExecutor) Close() error {
tableID := e.tblInfo.ID
physicalTableID := GetPhysID(e.tblInfo, e.partitionDefIdx)
kvReqTotal := e.stats.SnapshotRuntimeStats.GetCmdRPCCount(tikvrpc.CmdGet)
rows := e.RuntimeStats().GetActRows()
if e.idxInfo != nil {
e.indexUsageReporter.ReportPointGetIndexUsage(tableID, physicalTableID, e.idxInfo.ID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsage(tableID, physicalTableID, e.idxInfo.ID, kvReqTotal, rows)
} else {
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, physicalTableID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, physicalTableID, kvReqTotal, rows)
}
}
e.done = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) {
}
}
if !found {
stats.groupRss = append(stats.groupRss, info.Clone())
stats.groupRss = append(stats.groupRss, info)
}
}

Expand Down