Skip to content

Commit

Permalink
executor: optimize executor runtime stats by avoid unnecessary clone (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 8, 2024
1 parent 1521bf7 commit 56c07d0
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 27 deletions.
8 changes: 4 additions & 4 deletions pkg/distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func TestSelectResultRuntimeStats(t *testing.T) {
s1.procKeys.Add(100)
s1.procKeys.Add(200)

s2 := *s1
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
s2 := s1.Clone()
stmtStats.RegisterStats(1, s1.Clone())
stmtStats.RegisterStats(1, s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, open:0s, close:0s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
require.Equal(t, expect, stats.String())
Expand All @@ -134,7 +134,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
s1.reqStat.RecordRPCErrorStats("server_is_busy")
s1.reqStat.RecordRPCErrorStats("server_is_busy")
stmtStats.RegisterStats(2, s1)
stmtStats.RegisterStats(2, s1.Clone())
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:1, total_time:1s}, rpc_errors:{server_is_busy:2}}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,15 @@ func (e *BatchPointGetExec) Close() error {
if e.RuntimeStats() != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
if e.indexUsageReporter != nil {
if e.indexUsageReporter != nil && e.stats != nil {
kvReqTotal := e.stats.GetCmdRPCCount(tikvrpc.CmdBatchGet)
// We cannot distinguish how many rows are coming from each partition. Here, we calculate all index usages
// percentage according to the row counts for the whole table.
rows := e.RuntimeStats().GetActRows()
if e.idxInfo != nil {
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.tblInfo.ID, e.idxInfo.ID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsage(e.tblInfo.ID, e.tblInfo.ID, e.idxInfo.ID, kvReqTotal, rows)
} else {
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, e.tblInfo.ID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, e.tblInfo.ID, kvReqTotal, rows)
}
}
e.inited = 0
Expand Down
14 changes: 4 additions & 10 deletions pkg/executor/internal/exec/indexusage.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,30 +89,24 @@ func (e *IndexUsageReporter) ReportCopIndexUsage(tableID int64, physicalTableID

// ReportPointGetIndexUsageForHandle wraps around `ReportPointGetIndexUsage` to get the `indexID` automatically
// from the `table.Table` if the table has a clustered index or integer primary key.
func (e *IndexUsageReporter) ReportPointGetIndexUsageForHandle(tblInfo *model.TableInfo, physicalTableID int64, planID int, kvRequestTotal int64) {
func (e *IndexUsageReporter) ReportPointGetIndexUsageForHandle(tblInfo *model.TableInfo, physicalTableID int64, kvRequestTotal, rows int64) {
idxID, ok := getClusterIndexID(tblInfo)
if !ok {
return
}

e.ReportPointGetIndexUsage(tblInfo.ID, physicalTableID, idxID, planID, kvRequestTotal)
e.ReportPointGetIndexUsage(tblInfo.ID, physicalTableID, idxID, kvRequestTotal, rows)
}

// ReportPointGetIndexUsage reports the index usage of a point get or batch point get
func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTableID int64, indexID int64, planID int, kvRequestTotal int64) {
func (e *IndexUsageReporter) ReportPointGetIndexUsage(tableID int64, physicalTableID int64, indexID int64, kvRequestTotal, rows int64) {
tableRowCount, ok := e.getTableRowCount(physicalTableID)
if !ok {
// skip if the table is empty or the stats is not valid
return
}

basic := e.runtimeStatsColl.GetBasicRuntimeStats(planID, false)
if basic == nil {
return
}
accessRows := basic.GetActRows()

sample := indexusage.NewSample(0, uint64(kvRequestTotal), uint64(accessRows), uint64(tableRowCount))
sample := indexusage.NewSample(0, uint64(kvRequestTotal), uint64(rows), uint64(tableRowCount))
e.reporter.Update(tableID, indexID, sample)
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/executor/internal/exec/indexusage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ func TestIndexUsageReporter(t *testing.T) {
runtimeStatsColl := sc.RuntimeStatsColl

// For PointGet and BatchPointGet
planID := 3
runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, 1, 2024)

require.Eventually(t, func() bool {
tk.Session().ReportUsageStats()
Expand All @@ -63,7 +61,7 @@ func TestIndexUsageReporter(t *testing.T) {
}, time.Second*5, time.Millisecond)

// For Index Scan
planID = 4
planID := 4
rows := uint64(2024)
zero := uint64(0)
executorID := "test-executor"
Expand All @@ -87,9 +85,7 @@ func TestIndexUsageReporter(t *testing.T) {
Version: statistics.PseudoVersion,
RealtimeCount: 100,
})
planID = 4
runtimeStatsColl.GetBasicRuntimeStats(planID, true).Record(time.Second, 2024)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, planID, 1)
reporter.ReportPointGetIndexUsage(tableID, tableID, indexID, 1, 2024)

require.Eventually(t, func() bool {
tk.Session().ReportUsageStats()
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,11 @@ func (e *PointGetExecutor) Close() error {
tableID := e.tblInfo.ID
physicalTableID := GetPhysID(e.tblInfo, e.partitionDefIdx)
kvReqTotal := e.stats.SnapshotRuntimeStats.GetCmdRPCCount(tikvrpc.CmdGet)
rows := e.RuntimeStats().GetActRows()
if e.idxInfo != nil {
e.indexUsageReporter.ReportPointGetIndexUsage(tableID, physicalTableID, e.idxInfo.ID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsage(tableID, physicalTableID, e.idxInfo.ID, kvReqTotal, rows)
} else {
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, physicalTableID, e.ID(), kvReqTotal)
e.indexUsageReporter.ReportPointGetIndexUsageForHandle(e.tblInfo, physicalTableID, kvReqTotal, rows)
}
}
e.done = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) {
}
}
if !found {
stats.groupRss = append(stats.groupRss, info.Clone())
stats.groupRss = append(stats.groupRss, info)
}
}

Expand Down

0 comments on commit 56c07d0

Please sign in to comment.