Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema, executor, txn: implement DATA_LOCK_WAITS table #24750

Merged
merged 24 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,8 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableTiDBTrx),
strings.ToLower(infoschema.ClusterTableTiDBTrx),
strings.ToLower(infoschema.TableDeadlocks),
strings.ToLower(infoschema.ClusterTableDeadlocks):
strings.ToLower(infoschema.ClusterTableDeadlocks),
strings.ToLower(infoschema.TableDataLockWaits):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
Expand Down
69 changes: 44 additions & 25 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -53,13 +54,16 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/deadlockhistory"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

type memtableRetriever struct {
Expand Down Expand Up @@ -158,6 +162,8 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
err = e.setDataForDeadlock(sctx)
case infoschema.ClusterTableDeadlocks:
err = e.setDataForClusterDeadlock(sctx)
case infoschema.TableDataLockWaits:
err = e.setDataForTableDataLockWaits(sctx)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -1005,6 +1011,40 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err
return nil
}

func hasPriv(ctx sessionctx.Context, priv mysql.PrivilegeType) bool {
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
return pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", priv)
}
return false
}

func (e *memtableRetriever) setDataForTableDataLockWaits(ctx sessionctx.Context) error {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
waits, err := ctx.GetStore().GetLockWaits()
if err != nil {
return err
}
for _, wait := range waits {
var digestStr interface{}
digest, err := resourcegrouptag.DecodeResourceGroupTag(wait.ResourceGroupTag)
if err != nil {
logutil.BgLogger().Warn("failed to decode resource group tag", zap.Error(err))
digestStr = nil
} else {
digestStr = hex.EncodeToString(digest)
}
e.rows = append(e.rows, types.MakeDatums(
wait.Key,
wait.Txn,
wait.WaitForTxn,
digestStr,
))
}
return nil
}

// DDLJobsReaderExec executes DDLJobs information retrieving.
type DDLJobsReaderExec struct {
baseExecutor
Expand Down Expand Up @@ -1189,13 +1229,7 @@ func (e *memtableRetriever) setDataForProcessList(ctx sessionctx.Context) {
}

loginUser := ctx.GetSessionVars().User
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}

hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
pl := sm.ShowProcessList()

records := make([][]types.Datum, 0, len(pl))
Expand Down Expand Up @@ -1946,13 +1980,8 @@ func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) er
func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context, tableName string) error {
// Seeing client errors should require the PROCESS privilege, with the exception of errors for your own user.
// This is similar to information_schema.processlist, which is the closest comparison.
var hasProcessPriv bool
hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
loginUser := ctx.GetSessionVars().User
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}

var rows [][]types.Datum
switch tableName {
Expand Down Expand Up @@ -2026,12 +2055,7 @@ func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) {
}

loginUser := ctx.GetSessionVars().User
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}
hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
infoList := sm.ShowTxnList()
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
Expand All @@ -2054,12 +2078,7 @@ func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) err
}

func (e *memtableRetriever) setDataForDeadlock(ctx sessionctx.Context) error {
hasPriv := false
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
hasPriv = pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv)
}

if !hasPriv {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}

Expand Down
11 changes: 11 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ const (
TableTiDBTrx = "TIDB_TRX"
// TableDeadlocks is the string constatnt of deadlock table.
TableDeadlocks = "DEADLOCKS"
// TableDataLockWaits is current lock waiting status table.
TableDataLockWaits = "DATA_LOCK_WAITS"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -243,6 +245,7 @@ var tableIDMap = map[string]int64{
ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71,
TableDeadlocks: autoid.InformationSchemaDBID + 72,
ClusterTableDeadlocks: autoid.InformationSchemaDBID + 73,
TableDataLockWaits: autoid.InformationSchemaDBID + 74,
}

type columnInfo struct {
Expand Down Expand Up @@ -1368,6 +1371,13 @@ var tableDeadlocksCols = []columnInfo{
{name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"},
}

var tableDataLockWaitsCols = []columnInfo{
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
{name: "KEY", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "The key that's being waiting on"},
{name: "TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "Current transaction that's waiting for the lock"},
{name: "CURRENT_HOLDING_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction that's holding the lock and blocks the current transaction"},
{name: "SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the SQL that's trying to acquire the lock"},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1739,6 +1749,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols,
TableTiDBTrx: tableTiDBTrxCols,
TableDeadlocks: tableDeadlocksCols,
TableDataLockWaits: tableDataLockWaitsCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
58 changes: 58 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/fn"
"github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/parser"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
Expand All @@ -46,9 +47,13 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mockstorage"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand All @@ -57,12 +62,17 @@ import (
)

var _ = Suite(&testTableSuite{&testTableSuiteBase{}})
var _ = Suite(&testDataLockWaitSuite{&testTableSuiteBase{}})
var _ = SerialSuites(&testClusterTableSuite{testTableSuiteBase: &testTableSuiteBase{}})

type testTableSuite struct {
*testTableSuiteBase
}

type testDataLockWaitSuite struct {
*testTableSuiteBase
}

type testTableSuiteBase struct {
store kv.Storage
dom *domain.Domain
Expand Down Expand Up @@ -1552,3 +1562,51 @@ func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) {
}, nil, nil), IsTrue)
_ = tk.MustQuery("select * from information_schema.deadlocks")
}

func (s *testDataLockWaitSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

client, pdClient, cluster, err := unistore.New("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you also need to implement a TearDownSuite method to close these resources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the more proper way to write the unit tests is to implement the GetLockWaitInfo api for the unistore, and construct a real lock waiting here. Since finishing that kind of tests would be too complicated and we may not have so much time, it's ok to me that we do not do that in this PR. But I think the tests need to be improved in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a unit test on unistore for testing GetLockWaitInfo api. (Will implement in the future)
And an itegration test for the whole DATA_LOCK_WAIT stuff for the whole system of TiDB-TiKV/unistore.(But I'm not very sure where should I put that test in.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see my tests for the deadlocks table, perhaps it may help you. If some integration tests are hard to implement in TiDB repo, or you relies on some specific configuration of the cluster, you can do it in UTF.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reuse the SetUpSuite and TearDownSuite of the testTableSuiteBase, which means, invoking s.testTableSuiteBase.SetUpSuite and only do additional work in your own code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetUpSuite is very different with testTableSuiteBase, but we can just use testTableSuiteBase's TearDownSuite here. So no need to implement TearDownSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are totally different, I think it's better to not nesting the testTableSuiteBase here, but writing the similar fields by yourself. But let's merge the PR first and you can do code refinements later.

c.Assert(err, IsNil)
unistore.BootstrapWithSingleStore(cluster)
kvstore, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)
_, digest1 := parser.NormalizeDigest("select * from t1 for update;")
_, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;")
s.store, err = mockstorage.NewMockStorageWithLockWaits(kvstore, []*deadlock.WaitForEntry{
{Txn: 1, WaitForTxn: 2, KeyHash: 3, Key: []byte("a"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil)},
{Txn: 4, WaitForTxn: 5, KeyHash: 6, Key: []byte("b"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil)},
})
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testDataLockWaitSuite) TestDataLockWait(c *C) {
_, digest1 := parser.NormalizeDigest("select * from t1 for update;")
_, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;")
tk := s.newTestKitWithRoot(c)
tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS;").Check(testkit.Rows("a 1 2 "+digest1.String(), "b 4 5 "+digest2.String()))
}

func (s *testDataLockWaitSuite) TestDataLockPrivilege(c *C) {
tk := s.newTestKitWithRoot(c)
tk.MustExec("create user 'testuser'@'localhost'")
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser",
Hostname: "localhost",
}, nil, nil), IsTrue)
err := tk.QueryToErr("select * from information_schema.DATA_LOCK_WAITS")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the PROCESS privilege(s) for this operation")

tk = s.newTestKitWithRoot(c)
tk.MustExec("create user 'testuser2'@'localhost'")
tk.MustExec("grant process on *.* to 'testuser2'@'localhost'")
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser2",
Hostname: "localhost",
}, nil, nil), IsTrue)
_ = tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS")
}
13 changes: 13 additions & 0 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv
import (
"context"

deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -103,6 +104,10 @@ func (t *mockTxn) GetSnapshot() Snapshot {
return nil
}

func (t *mockTxn) GetUnionStore() UnionStore {
return nil
}

func (t *mockTxn) NewStagingBuffer() MemBuffer {
return nil
}
Expand Down Expand Up @@ -210,6 +215,10 @@ func (s *mockStorage) GetMemCache() MemManager {
return nil
}

func (s *mockStorage) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
return nil, nil
}

func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 {
return 0
}
Expand Down Expand Up @@ -255,3 +264,7 @@ func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
}

func (s *mockSnapshot) SetOption(opt int, val interface{}) {}

func (s *mockSnapshot) GetLockWaits() []deadlockpb.WaitForEntry {
return nil
}
5 changes: 4 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"time"

deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -340,7 +341,7 @@ type Driver interface {
type Storage interface {
// Begin a global transaction
Begin() (Transaction, error)
// Begin a transaction with given option
// BeginWithOption begins a transaction with given option
BeginWithOption(option tikv.StartTSOption) (Transaction, error)
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
Expand Down Expand Up @@ -369,6 +370,8 @@ type Storage interface {
GetMemCache() MemManager
// GetMinSafeTS return the minimal SafeTS of the storage with given txnScope.
GetMinSafeTS(txnScope string) uint64
// GetLockWaits return all lock wait information
longfangsong marked this conversation as resolved.
Show resolved Hide resolved
GetLockWaits() ([]*deadlockpb.WaitForEntry, error)
}

// EtcdBackend is used for judging a storage is a real TiKV.
Expand Down
23 changes: 23 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"time"

"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/copr"
derr "github.com/pingcap/tidb/store/driver/error"
txn_driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -331,3 +334,23 @@ func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) {
func (s *tikvStore) ShowStatus(ctx context.Context, key string) (interface{}, error) {
return nil, kv.ErrNotImplemented
}

// GetLockWaits get return lock waits info
func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
stores := s.GetRegionCache().GetStoresByType(tikvrpc.TiKV)
var result []*deadlockpb.WaitForEntry
for _, store := range stores {
resp, err := s.GetTiKVClient().SendRequest(context.TODO(), store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdLockWaitInfo, &kvrpcpb.GetLockWaitInfoRequest{}), time.Second*30)
if err != nil {
logutil.BgLogger().Warn("query lock wait info failed", zap.Error(err))
continue
}
if resp.Resp == nil {
logutil.BgLogger().Warn("lock wait info from store is nil")
continue
}
entries := resp.Resp.(*kvrpcpb.GetLockWaitInfoResponse).Entries
result = append(result, entries...)
}
return result, nil
}
2 changes: 2 additions & 0 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Storage interface {
GetTiKVClient() tikv.Client
Closed() <-chan struct{}
GetMinSafeTS(txnScope string) uint64
GetLockWaits() ([]*deadlockpb.WaitForEntry, error)
}

// Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table.
Expand Down
Loading