From de28a148afef5ce587e9dc76ed3fc8a23e378e51 Mon Sep 17 00:00:00 2001
From: crazycs520 <crazycs520@gmail.com>
Date: Thu, 14 Mar 2024 16:56:42 +0800
Subject: [PATCH 1/3] fix issue that some query execution stats was omitted
 when execution was interrupted

Signed-off-by: crazycs520 <crazycs520@gmail.com>
---
 pkg/distsql/select_result.go        |  9 +++-
 pkg/server/conn_test.go             | 11 +++++
 pkg/store/copr/coprocessor.go       | 65 ++++++++++++++++++++++++-----
 pkg/store/mockstore/unistore/rpc.go |  4 ++
 4 files changed, 78 insertions(+), 11 deletions(-)

diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go
index 55d3da05d363b..63a71b3779628 100644
--- a/pkg/distsql/select_result.go
+++ b/pkg/distsql/select_result.go
@@ -502,7 +502,7 @@ func recordExecutionSummariesForTiFlashTasks(sctx *stmtctx.StatementContext, exe
 
 func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) (err error) {
 	callee := copStats.CalleeAddress
-	if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
+	if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || (callee == "" && len(copStats.Stats) == 0) {
 		return
 	}
 
@@ -603,6 +603,13 @@ func (r *selectResult) Close() error {
 	if respSize > 0 {
 		r.memConsume(-respSize)
 	}
+	if unconsumed, ok := r.resp.(copr.HasUnconsumedCopRuntimeStats); ok && unconsumed != nil {
+		unconsumedCopStats := unconsumed.CollectUnconsumedCopRuntimeStats()
+		for _, copStats := range unconsumedCopStats {
+			_ = r.updateCopRuntimeStats(context.Background(), copStats, time.Duration(0))
+			r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
+		}
+	}
 	if r.stats != nil {
 		defer func() {
 			if ci, ok := r.resp.(copr.CopInfo); ok {
diff --git a/pkg/server/conn_test.go b/pkg/server/conn_test.go
index d4353af605020..25161e4536260 100644
--- a/pkg/server/conn_test.go
+++ b/pkg/server/conn_test.go
@@ -52,6 +52,7 @@ import (
 	"github.com/pingcap/tidb/pkg/util/arena"
 	"github.com/pingcap/tidb/pkg/util/chunk"
 	"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
+	"github.com/pingcap/tidb/pkg/util/plancodec"
 	"github.com/pingcap/tidb/pkg/util/sqlkiller"
 	"github.com/stretchr/testify/require"
 	tikverr "github.com/tikv/client-go/v2/error"
@@ -720,6 +721,16 @@ func TestConnExecutionTimeout(t *testing.T) {
 	tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("0"))
 	err := tk.QueryToErr("select * FROM testTable2 WHERE SLEEP(1);")
 	require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
+	// Test executor stats when execution time exceeded.
+	tk.MustExec("set @@tidb_slow_log_threshold=300")
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`))
+	err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2")
+	require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep"))
+	require.Error(t, err)
+	require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
+	planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan())
+	require.NoError(t, err)
+	require.Regexp(t, "TableReader.*cop_task: {num: .*, rpc_num: .*, rpc_time: .*", planInfo)
 
 	// Killed because of max execution time, reset Killed to 0.
 	tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go
index 3abfba960f186..2ea18eff44655 100644
--- a/pkg/store/copr/coprocessor.go
+++ b/pkg/store/copr/coprocessor.go
@@ -700,7 +700,8 @@ type copIterator struct {
 	storeBatchedNum         atomic.Uint64
 	storeBatchedFallbackNum atomic.Uint64
 
-	runawayChecker *resourcegroup.RunawayChecker
+	runawayChecker  *resourcegroup.RunawayChecker
+	unconsumedStats *unconsumedCopRuntimeStats
 }
 
 // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
@@ -723,6 +724,7 @@ type copIteratorWorker struct {
 
 	storeBatchedNum         *atomic.Uint64
 	storeBatchedFallbackNum *atomic.Uint64
+	unconsumedStats         *unconsumedCopRuntimeStats
 }
 
 // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
@@ -833,6 +835,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
 func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) {
 	taskCh := make(chan *copTask, 1)
 	smallTaskCh := make(chan *copTask, 1)
+	it.unconsumedStats = &unconsumedCopRuntimeStats{}
 	it.wg.Add(it.concurrency + it.smallTaskConcurrency)
 	// Start it.concurrency number of workers to handle cop requests.
 	for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ {
@@ -857,6 +860,7 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC
 			pagingTaskIdx:              &it.pagingTaskIdx,
 			storeBatchedNum:            &it.storeBatchedNum,
 			storeBatchedFallbackNum:    &it.storeBatchedFallbackNum,
+			unconsumedStats:            it.unconsumedStats,
 		}
 		go worker.run(ctx)
 	}
@@ -1096,6 +1100,22 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
 	return resp, nil
 }
 
+// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
+type HasUnconsumedCopRuntimeStats interface {
+	CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats
+}
+
+func (it *copIterator) CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats {
+	if it == nil {
+		return nil
+	}
+	it.unconsumedStats.Lock()
+	stats := make([]*CopRuntimeStats, 0, len(it.unconsumedStats.stats))
+	stats = append(stats, it.unconsumedStats.stats...)
+	it.unconsumedStats.Unlock()
+	return stats
+}
+
 // Associate each region with an independent backoffer. In this way, when multiple regions are
 // unavailable, TiDB can execute very quickly without blocking
 func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
@@ -1261,6 +1281,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
 			err = worker.handleTiDBSendReqErr(err, task, ch)
 			return nil, err
 		}
+		worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx)
 		return nil, errors.Trace(err)
 	}
 
@@ -1758,17 +1779,24 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
 	if resp.detail == nil {
 		resp.detail = new(CopRuntimeStats)
 	}
-	resp.detail.Stats = worker.kvclient.Stats
+	worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
+}
+
+func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
+	copStats.Stats = worker.kvclient.Stats
 	backoffTimes := bo.GetBackoffTimes()
-	resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
-	resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
-	resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
+	copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
+	copStats.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
+	copStats.BackoffTimes = make(map[string]int, len(backoffTimes))
 	for backoff := range backoffTimes {
-		resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
-		resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
+		copStats.BackoffTimes[backoff] = backoffTimes[backoff]
+		copStats.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
 	}
 	if rpcCtx != nil {
-		resp.detail.CalleeAddress = rpcCtx.Addr
+		copStats.CalleeAddress = rpcCtx.Addr
+	}
+	if resp == nil {
+		return
 	}
 	sd := &util.ScanDetail{}
 	td := util.TimeDetail{}
@@ -1791,8 +1819,20 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
 			}
 		}
 	}
-	resp.detail.ScanDetail = sd
-	resp.detail.TimeDetail = td
+	copStats.ScanDetail = sd
+	copStats.TimeDetail = td
+}
+
+func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) {
+	if worker.kvclient.Stats == nil {
+		return
+	}
+	copStats := &CopRuntimeStats{}
+	worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil)
+	worker.unconsumedStats.Lock()
+	worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats)
+	worker.unconsumedStats.Unlock()
+	worker.kvclient.Stats = nil
 }
 
 // CopRuntimeStats contains execution detail information.
@@ -1803,6 +1843,11 @@ type CopRuntimeStats struct {
 	CoprCacheHit bool
 }
 
+type unconsumedCopRuntimeStats struct {
+	sync.Mutex
+	stats []*CopRuntimeStats
+}
+
 func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
 	errCode := errno.ErrUnknown
 	errMsg := err.Error()
diff --git a/pkg/store/mockstore/unistore/rpc.go b/pkg/store/mockstore/unistore/rpc.go
index c2f7ac78a00e7..2edbc24598338 100644
--- a/pkg/store/mockstore/unistore/rpc.go
+++ b/pkg/store/mockstore/unistore/rpc.go
@@ -93,6 +93,10 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
 			failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
 		}
 	})
+	failpoint.Inject("unistoreRPCSlowByInjestSleep", func(val failpoint.Value) {
+		time.Sleep(time.Duration(val.(int)) * time.Millisecond)
+		failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
+	})
 
 	select {
 	case <-ctx.Done():

From fa859bba9131ef49e1fb65e67ae603b52c707b16 Mon Sep 17 00:00:00 2001
From: crazycs520 <crazycs520@gmail.com>
Date: Thu, 14 Mar 2024 17:16:39 +0800
Subject: [PATCH 2/3] refine

Signed-off-by: crazycs520 <crazycs520@gmail.com>
---
 pkg/store/copr/coprocessor.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go
index 2ea18eff44655..bdd14548fb45f 100644
--- a/pkg/store/copr/coprocessor.go
+++ b/pkg/store/copr/coprocessor.go
@@ -1106,7 +1106,7 @@ type HasUnconsumedCopRuntimeStats interface {
 }
 
 func (it *copIterator) CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats {
-	if it == nil {
+	if it == nil || it.unconsumedStats == nil {
 		return nil
 	}
 	it.unconsumedStats.Lock()

From 71883b26da6d6ee67efcae3d48dcb73c789ec90b Mon Sep 17 00:00:00 2001
From: crazycs520 <crazycs520@gmail.com>
Date: Thu, 14 Mar 2024 18:19:26 +0800
Subject: [PATCH 3/3] fix ci

Signed-off-by: crazycs520 <crazycs520@gmail.com>
---
 pkg/server/BUILD.bazel              | 1 +
 pkg/store/copr/coprocessor.go       | 1 +
 pkg/store/mockstore/unistore/rpc.go | 2 +-
 3 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index ef63c250b22e9..c6b7ad1e527f7 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -190,6 +190,7 @@ go_test(
         "//pkg/util/chunk",
         "//pkg/util/context",
         "//pkg/util/dbterror/exeerrors",
+        "//pkg/util/plancodec",
         "//pkg/util/replayer",
         "//pkg/util/sqlkiller",
         "//pkg/util/syncutil",
diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go
index bdd14548fb45f..d4e6bb045d974 100644
--- a/pkg/store/copr/coprocessor.go
+++ b/pkg/store/copr/coprocessor.go
@@ -1102,6 +1102,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
 
 // HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
 type HasUnconsumedCopRuntimeStats interface {
+	// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
 	CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats
 }
 
diff --git a/pkg/store/mockstore/unistore/rpc.go b/pkg/store/mockstore/unistore/rpc.go
index 2edbc24598338..b255138dff63b 100644
--- a/pkg/store/mockstore/unistore/rpc.go
+++ b/pkg/store/mockstore/unistore/rpc.go
@@ -94,7 +94,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
 		}
 	})
 	failpoint.Inject("unistoreRPCSlowByInjestSleep", func(val failpoint.Value) {
-		time.Sleep(time.Duration(val.(int)) * time.Millisecond)
+		time.Sleep(time.Duration(val.(int) * int(time.Millisecond)))
 		failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
 	})