Skip to content

Commit

Permalink
session: make TxnInfo() return even if process info is empty (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and tangenta committed Dec 30, 2024
1 parent 7d16cc7 commit 0bb9036
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ tools/bin/revive:
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/mgechev/[email protected]

tools/bin/failpoint-ctl:
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/failpoint/failpoint-ctl@2eaa328
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/failpoint/failpoint-ctl@9b3b6e3

tools/bin/errdoc-gen:
GOBIN=$(shell pwd)/tools/bin $(GO) install github.com/pingcap/errors/errdoc-gen@518f63d
Expand Down
12 changes: 7 additions & 5 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
return errors.Trace(err)
}
defer se.Rollback()
var startTS uint64
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS = sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()

txn, err := se.Txn()
if err != nil {
return err
}
startTS := txn.StartTS()
failpoint.InjectCall("wrapInBeginRollbackStartTS", startTS)
return f(startTS)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 15,
shard_count = 18,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand All @@ -78,7 +78,6 @@ go_test(
"//pkg/errno",
"//pkg/parser/model",
"//pkg/testkit",
"//tests/realtikvtest",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
Expand Down
36 changes: 33 additions & 3 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -121,7 +120,7 @@ func TestIngestError(t *testing.T) {
}

func TestAddIndexIngestPanic(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer ingesttestutil.InjectMockBackendMgr(t, store)()
Expand All @@ -142,8 +141,39 @@ func TestAddIndexIngestPanic(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockLocalWriterPanic"))
}

func TestAddIndexSetInternalSessions(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer ingesttestutil.InjectMockBackendMgr(t, store)()

tk.MustExec("set global tidb_enable_dist_task = 1;")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1);")
expectInternalTS := []uint64{}
actualInternalTS := []uint64{}
err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) {
expectInternalTS = append(expectInternalTS, startTS)
})
require.NoError(t, err)
defer failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS")
err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/scanRecordExec", "return")
require.NoError(t, err)
defer failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/scanRecordExec")
ddl.OperatorCallBackForTest = func() {
mgr := tk.Session().GetSessionManager()
actualInternalTS = mgr.GetInternalSessionStartTSList()
}
tk.MustExec("alter table t add index idx(a);")
require.Len(t, expectInternalTS, 1)
for _, ts := range expectInternalTS {
require.Contains(t, actualInternalTS, ts)
}
}

func TestAddIndexIngestCancel(t *testing.T) {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer ingesttestutil.InjectMockBackendMgr(t, store)()
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/internal/session/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package session

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -84,6 +85,7 @@ func (sg *Pool) Put(ctx sessionctx.Context) {

// no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to
// Put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
ctx.RollbackTxn(context.Background())
sg.resPool.Put(ctx.(pools.Resource))
infosync.DeleteInternalSession(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2487,7 +2487,7 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
// Otherwise, you can see only your own transactions.
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
if !hasProcessPriv && loginUser != nil && info.ProcessInfo.Username != loginUser.Username {
continue
}
e.txnInfo = append(e.txnInfo, info)
Expand Down
16 changes: 10 additions & 6 deletions pkg/infoschema/test/clustertablestest/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,9 +1219,11 @@ func TestTiDBTrx(t *testing.T) {
CurrentSQLDigest: digest.String(),
State: txninfo.TxnIdle,
EntriesCount: 1,
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
ProcessInfo: &txninfo.ProcessInfo{
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
},
}

blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local)
Expand All @@ -1230,9 +1232,11 @@ func TestTiDBTrx(t *testing.T) {
CurrentSQLDigest: "",
AllSQLDigests: []string{"sql1", "sql2", digest.String()},
State: txninfo.TxnLockAcquiring,
ConnectionID: 10,
Username: "user1",
CurrentDB: "db1",
ProcessInfo: &txninfo.ProcessInfo{
ConnectionID: 10,
Username: "user1",
CurrentDB: "db1",
},
}
sm.TxnInfo[1].BlockStartTime.Valid = true
sm.TxnInfo[1].BlockStartTime.Time = blockTime2
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,15 +817,16 @@ func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo {
return rs
}

// ShowTxnList shows all txn info for displaying in `TIDB_TRX`
// ShowTxnList shows all txn info for displaying in `TIDB_TRX`.
// Internal sessions are not taken into consideration.
func (s *Server) ShowTxnList() []*txninfo.TxnInfo {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
rs := make([]*txninfo.TxnInfo, 0, len(s.clients))
for _, client := range s.clients {
if client.ctx.Session != nil {
info := client.ctx.Session.TxnInfo()
if info != nil {
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) {
}

// TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only*
// Process field may not initialize if this is a session used internally.
func (s *session) TxnInfo() *txninfo.TxnInfo {
s.txn.mu.RLock()
// Copy on read to get a snapshot, this API shouldn't be frequently called.
Expand All @@ -519,17 +520,18 @@ func (s *session) TxnInfo() *txninfo.TxnInfo {

processInfo := s.ShowProcess()
if processInfo == nil {
return nil
return &txnInfo
}
txnInfo.ProcessInfo = &txninfo.ProcessInfo{
ConnectionID: processInfo.ID,
Username: processInfo.User,
CurrentDB: processInfo.DB,
RelatedTableIDs: make(map[int64]struct{}),
}
txnInfo.ConnectionID = processInfo.ID
txnInfo.Username = processInfo.User
txnInfo.CurrentDB = processInfo.DB
txnInfo.RelatedTableIDs = make(map[int64]struct{})
s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, value interface{}) bool {
txnInfo.RelatedTableIDs[key.(int64)] = struct{}{}
txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{}
return true
})

return &txnInfo
}

Expand Down Expand Up @@ -3857,9 +3859,10 @@ func GetStartTSFromSession(se interface{}) (startTS, processInfoID uint64) {
txnInfo := tmp.TxnInfo()
if txnInfo != nil {
startTS = txnInfo.StartTS
processInfoID = txnInfo.ConnectionID
if txnInfo.ProcessInfo != nil {
processInfoID = txnInfo.ProcessInfo.ConnectionID
}
}

logutil.BgLogger().Debug(
"GetStartTSFromSession getting startTS of internal session",
zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS)))
Expand Down
29 changes: 24 additions & 5 deletions pkg/session/txninfo/txn_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ type TxnInfo struct {
// How many entries are in MemDB
EntriesCount uint64

// The following fields will be filled in `session` instead of `LazyTxn`
// The following field will be filled in `session` instead of `LazyTxn`
ProcessInfo *ProcessInfo
}

// ProcessInfo is part of fields of txnInfo, which will be filled in `session` instead of `LazyTxn`
type ProcessInfo struct {
// Which session this transaction belongs to
ConnectionID uint64
// The user who open this session
Expand Down Expand Up @@ -219,13 +223,25 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
return types.NewDatum(info.EntriesCount)
},
SessionIDStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.ConnectionID)
var connectionID uint64
if info.ProcessInfo != nil {
connectionID = info.ProcessInfo.ConnectionID
}
return types.NewDatum(connectionID)
},
UserStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.Username)
var userName string
if info.ProcessInfo != nil {
userName = info.ProcessInfo.Username
}
return types.NewDatum(userName)
},
DBStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.CurrentDB)
var currentDB string
if info.ProcessInfo != nil {
currentDB = info.ProcessInfo.CurrentDB
}
return types.NewDatum(currentDB)
},
AllSQLDigestsStr: func(info *TxnInfo) types.Datum {
allSQLDigests := info.AllSQLDigests
Expand All @@ -241,7 +257,10 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
return types.NewDatum(string(res))
},
RelatedTableIDsStr: func(info *TxnInfo) types.Datum {
relatedTableIDs := info.RelatedTableIDs
var relatedTableIDs map[int64]struct{}
if info.ProcessInfo != nil {
relatedTableIDs = info.ProcessInfo.RelatedTableIDs
}
str := strings.Builder{}
first := true
for tblID := range relatedTableIDs {
Expand Down
1 change: 0 additions & 1 deletion pkg/testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_library(
"//pkg/resourcemanager",
"//pkg/session",
"//pkg/session/txninfo",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/store/driver",
"//pkg/store/mockstore",
Expand Down
19 changes: 11 additions & 8 deletions pkg/testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/session/txninfo"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
)

Expand Down Expand Up @@ -52,7 +51,7 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn))
for _, se := range msm.Conn {
info := se.TxnInfo()
if info != nil {
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
Expand Down Expand Up @@ -155,12 +154,16 @@ func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 {
defer msm.mu.Unlock()
ret := make([]uint64, 0, len(msm.internalSessions))
for internalSess := range msm.internalSessions {
se := internalSess.(sessionctx.Context)
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS := sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()
ret = append(ret, startTS)
// Ref the implementation of `GetInternalSessionStartTSList` on the real session manager. The `TxnInfo` is more
// accurate, because if a session is pending, the `StartTS` in `sessVars.TxnCtx` will not be updated. For example,
// if there is not DDL for a long time, the minimal internal session start ts will not have any progress.
if se, ok := internalSess.(interface{ TxnInfo() *txninfo.TxnInfo }); ok {
txn := se.TxnInfo()
if txn != nil {
ret = append(ret, txn.StartTS)
}
continue
}
}
return ret
}
Expand Down
6 changes: 3 additions & 3 deletions tests/realtikvtest/txntest/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func TestBasicTxnState(t *testing.T) {
require.Equal(t, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String()}, info.AllSQLDigests)

// len and size will be covered in TestLenAndSize
require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ConnectionID)
require.Equal(t, "", info.Username)
require.Equal(t, "test", info.CurrentDB)
require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ProcessInfo.ConnectionID)
require.Equal(t, "", info.ProcessInfo.Username)
require.Equal(t, "test", info.ProcessInfo.CurrentDB)
require.Equal(t, startTS, info.StartTS)

require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause"))
Expand Down

0 comments on commit 0bb9036

Please sign in to comment.