Skip to content

Commit

Permalink
Cherrypick : add rpc error stats and refine log for diagnose (pingcap…
Browse files Browse the repository at this point in the history
…#53335) (pingcap#96)

* *: add rpc error stats and refine log for diagnose (pingcap#53335)

* sessionctx: add sysvar to control exp feats of replica selector

Signed-off-by: zyguan <[email protected]>

* *: add rpc error stats and refine log (pingcap#52810)

Signed-off-by: crazycs520 <[email protected]>

* store: refine coprocessor error log (pingcap#52729)

Signed-off-by: crazycs520 <[email protected]>

* *: improve log about stale-read query (pingcap#52494)

Signed-off-by: crazycs520 <[email protected]>

* *: refine tikv/client-go log to print context information(such as conn id) as much as possible (pingcap#45559)

Signed-off-by: crazycs520 <[email protected]>

* add leader peer id log

Signed-off-by: crazycs520 <[email protected]>

* update log

Signed-off-by: crazycs520 <[email protected]>

* executor: fix issue that some query execution stats was omitted when execution was interrupted (pingcap#51787)

close pingcap#51660

Signed-off-by: crazycs520 <[email protected]>

* update go.mod

Signed-off-by: crazycs520 <[email protected]>

---------

Signed-off-by: zyguan <[email protected]>
Signed-off-by: crazycs520 <[email protected]>
Co-authored-by: zyguan <[email protected]>

* Update go.sum

* Update go.mod

---------

Signed-off-by: zyguan <[email protected]>
Signed-off-by: crazycs520 <[email protected]>
Co-authored-by: crazycs <[email protected]>
Co-authored-by: zyguan <[email protected]>
  • Loading branch information
3 people authored and GitHub Enterprise committed May 29, 2024
1 parent 727a968 commit 5ca1a96
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 59 deletions.
6 changes: 3 additions & 3 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
Expand All @@ -131,7 +131,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
// Test for idempotence.
require.Equal(t, expect, stats.String())

s1.rpcStat.Stats[tikvrpc.CmdCop] = &tikv.RPCRuntimeStats{
s1.reqStat.RPCStats[tikvrpc.CmdCop] = &tikv.RPCRuntimeStats{
Count: 1,
Consume: int64(time.Second),
}
Expand All @@ -146,7 +146,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.procKeys.Add(100)
Expand Down
38 changes: 18 additions & 20 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tipb/go-tipb"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -346,7 +345,7 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || (callee == "" && copStats.ReqStats == nil) {
return
}

Expand All @@ -360,7 +359,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.stats == nil {
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
}
Expand Down Expand Up @@ -453,6 +452,13 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if unconsumed, ok := r.resp.(copr.HasUnconsumedCopRuntimeStats); ok && unconsumed != nil {
unconsumedCopStats := unconsumed.CollectUnconsumedCopRuntimeStats()
for _, copStats := range unconsumedCopStats {
r.updateCopRuntimeStats(context.Background(), copStats, time.Duration(0))
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
}
Expand All @@ -471,7 +477,7 @@ type selectResultRuntimeStats struct {
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
reqStat *tikv.RegionRequestRuntimeStats
distSQLConcurrency int
CoprCacheHitNum int64
}
Expand All @@ -486,7 +492,7 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim
maps.Copy(s.backoffSleep, copStats.BackoffSleep)
s.totalProcessTime += copStats.TimeDetail.ProcessTime
s.totalWaitTime += copStats.TimeDetail.WaitTime
s.rpcStat.Merge(copStats.RegionRequestRuntimeStats)
s.reqStat.Merge(copStats.ReqStats)
if copStats.CoprCacheHit {
s.CoprCacheHitNum++
}
Expand All @@ -497,7 +503,7 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
copRespTime: execdetails.Percentile[execdetails.Duration]{},
procKeys: execdetails.Percentile[execdetails.Int64]{},
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
reqStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
}
Expand All @@ -508,7 +514,7 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
}
newRs.totalProcessTime += s.totalProcessTime
newRs.totalWaitTime += s.totalWaitTime
maps.Copy(newRs.rpcStat.Stats, s.rpcStat.Stats)
newRs.reqStat = s.reqStat.Clone()
return &newRs
}

Expand All @@ -525,13 +531,13 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
}
s.totalProcessTime += other.totalProcessTime
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.reqStat.Merge(other.reqStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
}

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
rpcStat := s.rpcStat
reqStat := s.reqStat
if s.copRespTime.Size() > 0 {
size := s.copRespTime.Size()
if size == 1 {
Expand Down Expand Up @@ -562,15 +568,6 @@ func (s *selectResultRuntimeStats) String() string {
buf.WriteString(execdetails.FormatDuration(s.totalWaitTime))
}
}
copRPC := rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
rpcStat = rpcStat.Clone()
delete(rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(execdetails.FormatDuration(time.Duration(copRPC.Consume)))
}
if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 {
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(s.copRespTime.Size()), 'f', 2, 64)))
Expand All @@ -584,10 +581,11 @@ func (s *selectResultRuntimeStats) String() string {
buf.WriteString("}")
}

rpcStatsStr := rpcStat.String()
rpcStatsStr := reqStat.String()
if len(rpcStatsStr) > 0 {
buf.WriteString(", ")
buf.WriteString(", rpc_info:{")
buf.WriteString(rpcStatsStr)
buf.WriteString("}")
}

if len(s.backoffSleep) > 0 {
Expand Down
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
}

resultRows := GetResultRowsCount(stmtCtx, a.Plan)
if txnTS == 0 {
// TODO: txnTS maybe ambiguous, consider logging stale-read-ts with a new field in the slow log.
txnTS = sessVars.TxnCtx.StaleReadTs
}

slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
Expand Down
14 changes: 7 additions & 7 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,16 @@ func TestCoprocessorPagingSize(t *testing.T) {
// Check 'rpc_num' in the execution information
//
// mysql> explain analyze select * from t_paging;
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, rpc_info:{Cop:{num_rpc:10, total_time:6.69ms}} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// 2 rows in set (0.01 sec)

getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) {
re := regexp.MustCompile("rpc_num: ([0-9]+)")
re := regexp.MustCompile("num_rpc:([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
Expand Down
10 changes: 5 additions & 5 deletions executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestCollectCopRuntimeStats(t *testing.T) {
rows := tk.MustQuery("explain analyze select * from t1").Rows()
require.Len(t, rows, 2)
explain := fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*rpc_num: .*, .*regionMiss:.*", explain)
require.Regexp(t, ".*num_rpc:.*, .*regionMiss:.*", explain)
require.NoError(t, failpoint.Disable("tikvclient/tikvStoreRespResult"))
}

Expand Down Expand Up @@ -586,15 +586,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select /*+ set_var(tikv_client_read_timeout=1) */ * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for tikv_client_read_timeout session variable.
tk.MustExec("set @@tikv_client_read_timeout=1;")
Expand All @@ -614,15 +614,15 @@ func TestTiKVClientReadTimeout(t *testing.T) {
rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .*num_rpc:2.*", explain)
}

func TestGetMvccByEncodedKeyRegionError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestExplainFor(t *testing.T) {
buf.WriteString(fmt.Sprintf("%v", v))
}
}
require.Regexp(t, "TableReader_5 10000.00 0 root time:.*, loops:1, cop_task: {num:.*, max:.*, proc_keys:.* rpc_num: 1, rpc_time:.*} data:TableFullScan_4 N/A N/A\n"+
require.Regexp(t, "TableReader_5 10000.00 0 root time:.*, loops:1, cop_task: {num:.*, max:.*, proc_keys:.*num_rpc:1, total_time:.*} data:TableFullScan_4 N/A N/A\n"+
"└─TableFullScan_4 10000.00 0 cop.* table:t1 tikv_task:{time:.*, loops:0} keep order:false, stats:pseudo N/A N/A",
buf.String())
}
Expand Down
18 changes: 17 additions & 1 deletion executor/slow_query_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"os"
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestSlowQuerySensitiveQuery(t *testing.T) {
))
}

func TestSlowQueryPrepared(t *testing.T) {
func TestSlowQueryMisc(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
originCfg := config.GetGlobalConfig()
Expand Down Expand Up @@ -113,6 +114,21 @@ func TestSlowQueryPrepared(t *testing.T) {
tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " +
"where query like 'select%sleep%' order by time desc limit 1").
Check(testkit.Rows("select `sleep` ( ? ) , ?;"))

// Test 3 kinds of stale-read query.
tk.MustExec("create table test.t_stale_read (a int)")
time.Sleep(time.Second + time.Millisecond*10)
tk.MustExec("set tidb_redact_log=0;")
tk.MustExec("set @@tidb_read_staleness='-1'")
tk.MustQuery("select a from test.t_stale_read")
tk.MustExec("set @@tidb_read_staleness='0'")
t1 := time.Now()
tk.MustQuery(fmt.Sprintf("select a from test.t_stale_read as of timestamp '%s'", t1.Format("2006-1-2 15:04:05")))
tk.MustExec(fmt.Sprintf("start transaction read only as of timestamp '%v'", t1.Format("2006-1-2 15:04:05")))
tk.MustQuery("select a from test.t_stale_read")
tk.MustExec("commit")
require.Len(t, tk.MustQuery("SELECT query, txn_start_ts FROM `information_schema`.`slow_query` "+
"where (query = 'select a from test.t_stale_read;' or query like 'select a from test.t_stale_read as of timestamp %') and Txn_start_ts > 0").Rows(), 3)
}

func TestLogSlowLogIndex(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.4-0.20231020030327-4ecf7c282e37
github.com/tikv/client-go/v2 v2.0.4-0.20240521070200-f9fbc4c8f578
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM=
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ=
github.com/tikv/client-go/v2 v2.0.4-0.20240521070200-f9fbc4c8f578 h1:NXhdhxU4dibxCmMChAOpyBXzuqi2VKp17nSx1HUg4HU=
github.com/tikv/client-go/v2 v2.0.4-0.20240521070200-f9fbc4c8f578/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down
1 change: 1 addition & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1754,6 +1754,7 @@ func (p *preprocessor) updateStateFromStaleReadProcessor() error {
if err := txnManager.OnStmtStart(context.TODO(), txnManager.GetCurrentStmt()); err != nil {
return err
}
p.sctx.GetSessionVars().TxnCtx.StaleReadTs = p.LastSnapshotTS
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,17 +1200,21 @@ func (cc *clientConn) Run(ctx context.Context) {
if storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err) {
logutil.Logger(ctx).Debug("Expected error for FOR UPDATE NOWAIT", zap.Error(err))
} else {
var startTS uint64
var timestamp uint64
if ctx := cc.getCtx(); ctx != nil && ctx.GetSessionVars() != nil && ctx.GetSessionVars().TxnCtx != nil {
startTS = ctx.GetSessionVars().TxnCtx.StartTS
timestamp = ctx.GetSessionVars().TxnCtx.StartTS
if timestamp == 0 && ctx.GetSessionVars().TxnCtx.StaleReadTs > 0 {
// for state-read query.
timestamp = ctx.GetSessionVars().TxnCtx.StaleReadTs
}
}
logutil.Logger(ctx).Info("command dispatched failed",
zap.String("connInfo", cc.String()),
zap.String("command", mysql.Command2Str[data[0]]),
zap.String("status", cc.SessionStatusToString()),
zap.Stringer("sql", getLastStmtInConn{cc}),
zap.String("txn_mode", txnMode),
zap.Uint64("timestamp", startTS),
zap.Uint64("timestamp", timestamp),
zap.String("err", errStrForLog(err, cc.ctx.GetSessionVars().EnableRedactLog)),
)
}
Expand Down
12 changes: 12 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/plancodec"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -761,6 +762,17 @@ func TestConnExecutionTimeout(t *testing.T) {

err = cc.handleQuery(context.Background(), "alter table testTable2 add index idx(age);")
require.NoError(t, err)

// Test executor stats when execution time exceeded.
tk.MustExec("set @@tidb_slow_log_threshold=300")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`))
err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCSlowByInjestSleep"))
require.Error(t, err)
require.Equal(t, "[tikv:1317]Query execution was interrupted", err.Error())
planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan())
require.NoError(t, err)
require.Regexp(t, "TableReader.*cop_task: {num: .*.*num_rpc:.*", planInfo)
}

func TestShutDown(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) (s
}

if r.StaleReadTS > 0 {
m.sctx.GetSessionVars().TxnCtx.StaleReadTs = r.StaleReadTS
return staleread.NewStalenessTxnContextProvider(m.sctx, r.StaleReadTS, nil), nil
}

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ type TxnCtxNoNeedToRestore struct {
InfoSchema interface{}
History interface{}
StartTS uint64
StaleReadTs uint64

// ShardStep indicates the max size of continuous rowid shard in one transaction.
ShardStep int
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *tikv.RP
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil && ss.enableCollectExecutionInfo {
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
ss.Stats.RecordRPCRuntimeStats(req.Type, time.Since(start))
}
if err != nil {
cancel()
Expand Down
Loading

0 comments on commit 5ca1a96

Please sign in to comment.