Skip to content

Commit

Permalink
infoschema, executor: Support getting current executing SQL text in T…
Browse files Browse the repository at this point in the history
…IDB_TRX table (pingcap#26357)
  • Loading branch information
MyonKeminta authored Jul 28, 2021
1 parent c585a76 commit 89e5ce0
Show file tree
Hide file tree
Showing 9 changed files with 617 additions and 89 deletions.
12 changes: 10 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,8 +1592,6 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableClientErrorsSummaryGlobal),
strings.ToLower(infoschema.TableClientErrorsSummaryByUser),
strings.ToLower(infoschema.TableClientErrorsSummaryByHost),
strings.ToLower(infoschema.TableTiDBTrx),
strings.ToLower(infoschema.ClusterTableTiDBTrx),
strings.ToLower(infoschema.TableDeadlocks),
strings.ToLower(infoschema.ClusterTableDeadlocks),
strings.ToLower(infoschema.TableDataLockWaits):
Expand All @@ -1605,6 +1603,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableTiDBTrx),
strings.ToLower(infoschema.ClusterTableTiDBTrx):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &tidbTrxTableRetriever{
table: v.Table,
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableStatementsSummary),
strings.ToLower(infoschema.TableStatementsSummaryHistory),
strings.ToLower(infoschema.ClusterTableStatementsSummaryHistory),
Expand Down
25 changes: 25 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6670,6 +6670,31 @@ select 10;`
}
}

func (s *testClusterTableSuite) TestSQLDigestTextRetriever(c *C) {
tkInit := testkit.NewTestKitWithInit(c, s.store)
tkInit.MustExec("set global tidb_enable_stmt_summary = 1")
tkInit.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1"))
tkInit.MustExec("drop table if exists test_sql_digest_text_retriever")
tkInit.MustExec("create table test_sql_digest_text_retriever (id int primary key, v int)")

tk := testkit.NewTestKitWithInit(c, s.store)
c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue)
tk.MustExec("insert into test_sql_digest_text_retriever values (1, 1)")

insertNormalized, insertDigest := parser.NormalizeDigest("insert into test_sql_digest_text_retriever values (1, 1)")
_, updateDigest := parser.NormalizeDigest("update test_sql_digest_text_retriever set v = v + 1 where id = 1")
r := &executor.SQLDigestTextRetriever{
SQLDigestsMap: map[string]string{
insertDigest.String(): "",
updateDigest.String(): "",
},
}
err := r.RetrieveLocal(context.Background(), tk.Se)
c.Assert(err, IsNil)
c.Assert(r.SQLDigestsMap[insertDigest.String()], Equals, insertNormalized)
c.Assert(r.SQLDigestsMap[updateDigest.String()], Equals, "")
}

func prepareLogs(c *C, logData []string, fileNames []string) {
writeFile := func(file string, data string) {
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
Expand Down
140 changes: 107 additions & 33 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -156,10 +157,6 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
infoschema.TableClientErrorsSummaryByUser,
infoschema.TableClientErrorsSummaryByHost:
err = e.setDataForClientErrorsSummary(sctx, e.table.Name.O)
case infoschema.TableTiDBTrx:
e.setDataForTiDBTrx(sctx)
case infoschema.ClusterTableTiDBTrx:
err = e.setDataForClusterTiDBTrx(sctx)
case infoschema.TableDeadlocks:
err = e.setDataForDeadlock(sctx)
case infoschema.ClusterTableDeadlocks:
Expand Down Expand Up @@ -2047,35 +2044,6 @@ func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context
return nil
}

func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) {
sm := ctx.GetSessionManager()
if sm == nil {
return
}

loginUser := ctx.GetSessionVars().User
hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
infoList := sm.ShowTxnList()
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
// Otherwise, you can see only your own transactions.
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
continue
}
e.rows = append(e.rows, info.ToDatum())
}
}

func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) error {
e.setDataForTiDBTrx(ctx)
rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
if err != nil {
return err
}
e.rows = rows
return nil
}

func (e *memtableRetriever) setDataForDeadlock(ctx sessionctx.Context) error {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
Expand Down Expand Up @@ -2182,6 +2150,112 @@ func (e *stmtSummaryTableRetriever) retrieve(ctx context.Context, sctx sessionct
return rows, nil
}

// tidbTrxTableRetriever is the memtable retriever for the TIDB_TRX and CLUSTER_TIDB_TRX table.
type tidbTrxTableRetriever struct {
dummyCloser
batchRetrieverHelper
table *model.TableInfo
columns []*model.ColumnInfo
txnInfo []*txninfo.TxnInfo
initialized bool
}

func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved {
return nil, nil
}

if !e.initialized {
e.initialized = true

sm := sctx.GetSessionManager()
if sm == nil {
e.retrieved = true
return nil, nil
}

loginUser := sctx.GetSessionVars().User
hasProcessPriv := hasPriv(sctx, mysql.ProcessPriv)
infoList := sm.ShowTxnList()
e.txnInfo = make([]*txninfo.TxnInfo, 0, len(infoList))
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
// Otherwise, you can see only your own transactions.
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
continue
}
e.txnInfo = append(e.txnInfo, info)
}

e.batchRetrieverHelper.totalRows = len(e.txnInfo)
e.batchRetrieverHelper.batchSize = 1024
}

// The current TiDB node's address is needed by the CLUSTER_TIDB_TRX table.
var err error
var instanceAddr string
switch e.table.Name.O {
case infoschema.ClusterTableTiDBTrx:
instanceAddr, err = infoschema.GetInstanceAddr(sctx)
if err != nil {
return nil, err
}
}

var res [][]types.Datum
err = e.nextBatch(func(start, end int) error {
// Before getting rows, collect the SQL digests that needs to be retrieved first.
var sqlRetriever *SQLDigestTextRetriever
for _, c := range e.columns {
if c.Name.O == txninfo.CurrentSQLDigestTextStr {
if sqlRetriever == nil {
sqlRetriever = NewSQLDigestTextRetriever()
}

for i := start; i < end; i++ {
sqlRetriever.SQLDigestsMap[e.txnInfo[i].CurrentSQLDigest] = ""
}
}
}
// Retrieve the SQL texts if necessary.
if sqlRetriever != nil {
err1 := sqlRetriever.RetrieveLocal(ctx, sctx)
if err1 != nil {
return errors.Trace(err1)
}
}

res = make([][]types.Datum, 0, end-start)

// Calculate rows.
for i := start; i < end; i++ {
row := make([]types.Datum, 0, len(e.columns))
for _, c := range e.columns {
if c.Name.O == util.ClusterTableInstanceColumnName {
row = append(row, types.NewDatum(instanceAddr))
} else if c.Name.O == txninfo.CurrentSQLDigestTextStr {
if text, ok := sqlRetriever.SQLDigestsMap[e.txnInfo[i].CurrentSQLDigest]; ok && len(text) != 0 {
row = append(row, types.NewDatum(text))
} else {
row = append(row, types.NewDatum(nil))
}
} else {
row = append(row, e.txnInfo[i].ToDatum(c.Name.O))
}
}
res = append(res, row)
}

return nil
})

if err != nil {
return nil, err
}

return res, nil
}

type hugeMemTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down
Loading

0 comments on commit 89e5ce0

Please sign in to comment.