Skip to content

Commit

Permalink
metric: add some metric by database (#41477)
Browse files Browse the repository at this point in the history
close #37892
  • Loading branch information
srstack authored Feb 18, 2023
1 parent 234b789 commit 79558c0
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 100 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ type Status struct {
MetricsInterval uint `toml:"metrics-interval" json:"metrics-interval"`
ReportStatus bool `toml:"report-status" json:"report-status"`
RecordQPSbyDB bool `toml:"record-db-qps" json:"record-db-qps"`
RecordDBLabel bool `toml:"record-db-label" json:"record-db-label"`
// After a duration of this time in seconds if the server doesn't see any activity it pings
// the client to see if the transport is still alive.
GRPCKeepAliveTime uint `toml:"grpc-keepalive-time" json:"grpc-keepalive-time"`
Expand Down Expand Up @@ -942,6 +943,7 @@ var defaultConf = Config{
StatusPort: DefStatusPort,
MetricsInterval: 15,
RecordQPSbyDB: false,
RecordDBLabel: false,
GRPCKeepAliveTime: 10,
GRPCKeepAliveTimeout: 3,
GRPCConcurrentStreams: 1024,
Expand Down
3 changes: 3 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ metrics-interval = 15
# Record statements qps by database name if it is enabled.
record-db-qps = false

# Record database name label if it is enabled.
record-db-label = false

[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0
Expand Down
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ metrics-interval = 15
# Record statements qps by database name if it is enabled.
record-db-qps = false
# Record database name label if it is enabled.
record-db-label = false
[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0
Expand Down
61 changes: 14 additions & 47 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,6 @@ import (
"go.uber.org/zap"
)

var (
stmtNodeCounterUse = metrics.StmtNodeCounter.WithLabelValues("Use")
stmtNodeCounterShow = metrics.StmtNodeCounter.WithLabelValues("Show")
stmtNodeCounterBegin = metrics.StmtNodeCounter.WithLabelValues("Begin")
stmtNodeCounterCommit = metrics.StmtNodeCounter.WithLabelValues("Commit")
stmtNodeCounterRollback = metrics.StmtNodeCounter.WithLabelValues("Rollback")
stmtNodeCounterInsert = metrics.StmtNodeCounter.WithLabelValues("Insert")
stmtNodeCounterReplace = metrics.StmtNodeCounter.WithLabelValues("Replace")
stmtNodeCounterDelete = metrics.StmtNodeCounter.WithLabelValues("Delete")
stmtNodeCounterUpdate = metrics.StmtNodeCounter.WithLabelValues("Update")
stmtNodeCounterSelect = metrics.StmtNodeCounter.WithLabelValues("Select")
stmtNodeCounterSavepoint = metrics.StmtNodeCounter.WithLabelValues("Savepoint")
)

// Compiler compiles an ast.StmtNode to a physical plan.
type Compiler struct {
Ctx sessionctx.Context
Expand Down Expand Up @@ -207,40 +193,21 @@ func CountStmtNode(stmtNode ast.StmtNode, inRestrictedSQL bool) {
}

typeLabel := ast.GetStmtLabel(stmtNode)
switch typeLabel {
case "Use":
stmtNodeCounterUse.Inc()
case "Show":
stmtNodeCounterShow.Inc()
case "Begin":
stmtNodeCounterBegin.Inc()
case "Commit":
stmtNodeCounterCommit.Inc()
case "Rollback":
stmtNodeCounterRollback.Inc()
case "Insert":
stmtNodeCounterInsert.Inc()
case "Replace":
stmtNodeCounterReplace.Inc()
case "Delete":
stmtNodeCounterDelete.Inc()
case "Update":
stmtNodeCounterUpdate.Inc()
case "Select":
stmtNodeCounterSelect.Inc()
case "Savepoint":
stmtNodeCounterSavepoint.Inc()
default:
metrics.StmtNodeCounter.WithLabelValues(typeLabel).Inc()
}

if !config.GetGlobalConfig().Status.RecordQPSbyDB {
return
}

dbLabels := getStmtDbLabel(stmtNode)
for dbLabel := range dbLabels {
metrics.DbStmtNodeCounter.WithLabelValues(dbLabel, typeLabel).Inc()
if config.GetGlobalConfig().Status.RecordQPSbyDB || config.GetGlobalConfig().Status.RecordDBLabel {
dbLabels := getStmtDbLabel(stmtNode)
switch {
case config.GetGlobalConfig().Status.RecordQPSbyDB:
for dbLabel := range dbLabels {
metrics.DbStmtNodeCounter.WithLabelValues(dbLabel, typeLabel).Inc()
}
case config.GetGlobalConfig().Status.RecordDBLabel:
for dbLabel := range dbLabels {
metrics.StmtNodeCounter.WithLabelValues(typeLabel, dbLabel).Inc()
}
}
} else {
metrics.StmtNodeCounter.WithLabelValues(typeLabel, "").Inc()
}
}

Expand Down
2 changes: 1 addition & 1 deletion metrics/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
Subsystem: "executor",
Name: "statement_total",
Help: "Counter of StmtNode.",
}, []string{LblType})
}, []string{LblType, LblDb})

// DbStmtNodeCounter records the number of statement with the same type and db.
DbStmtNodeCounter = prometheus.NewCounterVec(
Expand Down
4 changes: 2 additions & 2 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
Name: "handle_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled queries.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblSQLType})
}, []string{LblSQLType, LblDb})

QueryTotalCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -89,7 +89,7 @@ var (
Subsystem: "server",
Name: "execute_error_total",
Help: "Counter of execute errors.",
}, []string{LblType})
}, []string{LblType, LblDb})

CriticalErrorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down
2 changes: 1 addition & 1 deletion metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func GetNonTransactionalStmtCounter() NonTransactionalStmtCounter {

// GetSavepointStmtCounter gets the savepoint statement executed counter.
func GetSavepointStmtCounter() int64 {
return readCounter(StmtNodeCounter.With(prometheus.Labels{LblType: "Savepoint"}))
return readCounter(StmtNodeCounter.With(prometheus.Labels{LblType: "Savepoint", LblDb: ""}))
}

// GetLazyPessimisticUniqueCheckSetCounter returns the counter of setting tidb_constraint_check_in_place_pessimistic to false.
Expand Down
55 changes: 15 additions & 40 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,6 @@ var (
mysql.ComSetOption: metrics.QueryTotalCounter.WithLabelValues("SetOption", "Error"),
}

queryDurationHistogramUse = metrics.QueryDurationHistogram.WithLabelValues("Use")
queryDurationHistogramShow = metrics.QueryDurationHistogram.WithLabelValues("Show")
queryDurationHistogramBegin = metrics.QueryDurationHistogram.WithLabelValues("Begin")
queryDurationHistogramCommit = metrics.QueryDurationHistogram.WithLabelValues("Commit")
queryDurationHistogramRollback = metrics.QueryDurationHistogram.WithLabelValues("Rollback")
queryDurationHistogramInsert = metrics.QueryDurationHistogram.WithLabelValues("Insert")
queryDurationHistogramReplace = metrics.QueryDurationHistogram.WithLabelValues("Replace")
queryDurationHistogramDelete = metrics.QueryDurationHistogram.WithLabelValues("Delete")
queryDurationHistogramUpdate = metrics.QueryDurationHistogram.WithLabelValues("Update")
queryDurationHistogramSelect = metrics.QueryDurationHistogram.WithLabelValues("Select")
queryDurationHistogramExecute = metrics.QueryDurationHistogram.WithLabelValues("Execute")
queryDurationHistogramSet = metrics.QueryDurationHistogram.WithLabelValues("Set")
queryDurationHistogramGeneral = metrics.QueryDurationHistogram.WithLabelValues(metrics.LblGeneral)

disconnectNormal = metrics.DisconnectionCounter.WithLabelValues(metrics.LblOK)
disconnectByClientWithError = metrics.DisconnectionCounter.WithLabelValues(metrics.LblError)
disconnectErrorUndetermined = metrics.DisconnectionCounter.WithLabelValues("undetermined")
Expand Down Expand Up @@ -1172,11 +1158,17 @@ func (cc *clientConn) Run(ctx context.Context) {
metrics.CriticalErrorCounter.Add(1)
logutil.Logger(ctx).Fatal("critical error, stop the server", zap.Error(err))
}
var txnMode string
var (
txnMode string
dbName string
)
if ctx := cc.getCtx(); ctx != nil {
txnMode = ctx.GetSessionVars().GetReadableTxnMode()
if config.GetGlobalConfig().Status.RecordDBLabel {
dbName = ctx.GetSessionVars().CurrentDB
}
}
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err)).Inc()
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err), dbName).Inc()
if storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err) {
logutil.Logger(ctx).Debug("Expected error for FOR UPDATE NOWAIT", zap.Error(err))
} else {
Expand Down Expand Up @@ -1268,40 +1260,23 @@ func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) {
affectedRows := cc.ctx.AffectedRows()
cc.ctx.GetTxnWriteThroughputSLI().FinishExecuteStmt(cost, affectedRows, sessionVar.InTxn())

var dbName string
if config.GetGlobalConfig().Status.RecordDBLabel {
dbName = sessionVar.CurrentDB
}

switch sqlType {
case "Use":
queryDurationHistogramUse.Observe(cost.Seconds())
case "Show":
queryDurationHistogramShow.Observe(cost.Seconds())
case "Begin":
queryDurationHistogramBegin.Observe(cost.Seconds())
case "Commit":
queryDurationHistogramCommit.Observe(cost.Seconds())
case "Rollback":
queryDurationHistogramRollback.Observe(cost.Seconds())
case "Insert":
queryDurationHistogramInsert.Observe(cost.Seconds())
affectedRowsCounterInsert.Add(float64(affectedRows))
case "Replace":
queryDurationHistogramReplace.Observe(cost.Seconds())
affectedRowsCounterReplace.Add(float64(affectedRows))
case "Delete":
queryDurationHistogramDelete.Observe(cost.Seconds())
affectedRowsCounterDelete.Add(float64(affectedRows))
case "Update":
queryDurationHistogramUpdate.Observe(cost.Seconds())
affectedRowsCounterUpdate.Add(float64(affectedRows))
case "Select":
queryDurationHistogramSelect.Observe(cost.Seconds())
case "Execute":
queryDurationHistogramExecute.Observe(cost.Seconds())
case "Set":
queryDurationHistogramSet.Observe(cost.Seconds())
case metrics.LblGeneral:
queryDurationHistogramGeneral.Observe(cost.Seconds())
default:
metrics.QueryDurationHistogram.WithLabelValues(sqlType).Observe(cost.Seconds())
}

metrics.QueryDurationHistogram.WithLabelValues(sqlType, dbName).Observe(cost.Seconds())
}

// dispatch handles client request based on command which is the first byte of the data.
Expand Down
48 changes: 47 additions & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2133,6 +2133,41 @@ func (cli *testServerClient) runTestStmtCount(t *testing.T) {
})
}

func (cli *testServerClient) runTestDBStmtCount(t *testing.T) {
cli.runTestsOnNewDB(t, nil, "DBStatementCount", func(dbt *testkit.DBTestKit) {
originStmtCnt := getDBStmtCnt(string(cli.getMetrics(t)), "DBStatementCount")

dbt.MustExec("create table test (a int)")

dbt.MustExec("insert into test values(1)")
dbt.MustExec("insert into test values(2)")
dbt.MustExec("insert into test values(3)")
dbt.MustExec("insert into test values(4)")
dbt.MustExec("insert into test values(5)")

dbt.MustExec("delete from test where a = 3")
dbt.MustExec("update test set a = 2 where a = 1")
dbt.MustExec("select * from test")
dbt.MustExec("select 2")

dbt.MustExec("prepare stmt1 from 'update test set a = 1 where a = 2'")
dbt.MustExec("execute stmt1")
dbt.MustExec("prepare stmt2 from 'select * from test'")
dbt.MustExec("execute stmt2")
dbt.MustExec("replace into test(a) values(6);")

currentStmtCnt := getStmtCnt(string(cli.getMetrics(t)))
require.Equal(t, originStmtCnt["CreateTable"]+1, currentStmtCnt["CreateTable"])
require.Equal(t, originStmtCnt["Insert"]+5, currentStmtCnt["Insert"])
require.Equal(t, originStmtCnt["Delete"]+1, currentStmtCnt["Delete"])
require.Equal(t, originStmtCnt["Update"]+2, currentStmtCnt["Update"])
require.Equal(t, originStmtCnt["Select"]+3, currentStmtCnt["Select"])
require.Equal(t, originStmtCnt["Prepare"]+2, currentStmtCnt["Prepare"])
require.Equal(t, originStmtCnt["Execute"]+0, currentStmtCnt["Execute"])
require.Equal(t, originStmtCnt["Replace"]+1, currentStmtCnt["Replace"])
})
}

func (cli *testServerClient) runTestTLSConnection(t *testing.T, overrider configOverrider) error {
dsn := cli.getDSN(overrider)
db, err := sql.Open("mysql", dsn)
Expand Down Expand Up @@ -2212,7 +2247,18 @@ func (cli *testServerClient) getMetrics(t *testing.T) []byte {

func getStmtCnt(content string) (stmtCnt map[string]int) {
stmtCnt = make(map[string]int)
r := regexp.MustCompile("tidb_executor_statement_total{type=\"([A-Z|a-z|-]+)\"} (\\d+)")
r := regexp.MustCompile("tidb_executor_statement_total{db=\"\",type=\"([A-Z|a-z|-]+)\"} (\\d+)")
matchResult := r.FindAllStringSubmatch(content, -1)
for _, v := range matchResult {
cnt, _ := strconv.Atoi(v[2])
stmtCnt[v[1]] = cnt
}
return stmtCnt
}

func getDBStmtCnt(content, dbName string) (stmtCnt map[string]int) {
stmtCnt = make(map[string]int)
r := regexp.MustCompile(fmt.Sprintf("tidb_executor_statement_total{db=\"%s\",type=\"([A-Z|a-z|-]+)\"} (\\d+)", dbName))
matchResult := r.FindAllStringSubmatch(content, -1)
for _, v := range matchResult {
cnt, _ := strconv.Atoi(v[2])
Expand Down
12 changes: 12 additions & 0 deletions server/tidb_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ func TestStmtCount(t *testing.T) {
ts.runTestStmtCount(t)
}

func TestDBStmtCount(t *testing.T) {
cfg := newTestConfig()
cfg.Port = 0
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = 0
cfg.Status.RecordDBLabel = true
cfg.Performance.TCPKeepAlive = true
ts := createTidbTestSuiteWithCfg(t, cfg)

ts.runTestDBStmtCount(t)
}

func TestLoadDataListPartition(t *testing.T) {
ts := createTidbTestSuite(t)

Expand Down
16 changes: 10 additions & 6 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ type tidbTestSuite struct {
}

func createTidbTestSuite(t *testing.T) *tidbTestSuite {
cfg := newTestConfig()
cfg.Port = 0
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = 0
cfg.Status.RecordDBLabel = false
cfg.Performance.TCPKeepAlive = true
return createTidbTestSuiteWithCfg(t, cfg)
}

func createTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *tidbTestSuite {
ts := &tidbTestSuite{testServerClient: newTestServerClient()}

// setup tidbTestSuite
Expand All @@ -92,11 +102,6 @@ func createTidbTestSuite(t *testing.T) *tidbTestSuite {
ts.domain, err = session.BootstrapSession(ts.store)
require.NoError(t, err)
ts.tidbdrv = NewTiDBDriver(ts.store)
cfg := newTestConfig()
cfg.Port = ts.port
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = ts.statusPort
cfg.Performance.TCPKeepAlive = true

server, err := NewServer(cfg, ts.tidbdrv)
require.NoError(t, err)
Expand Down Expand Up @@ -124,7 +129,6 @@ func createTidbTestSuite(t *testing.T) *tidbTestSuite {
}
view.Stop()
})

return ts
}

Expand Down
15 changes: 13 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,7 +1938,13 @@ func (s *session) ExecRestrictedStmt(ctx context.Context, stmtNode ast.StmtNode,
if err != nil {
return nil, nil, err
}
metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal).Observe(time.Since(startTime).Seconds())

var dbName string
if config.GetGlobalConfig().Status.RecordDBLabel {
dbName = se.GetSessionVars().CurrentDB
}

metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal, dbName).Observe(time.Since(startTime).Seconds())
return rows, rs.Fields(), err
}

Expand Down Expand Up @@ -2110,7 +2116,12 @@ func (s *session) ExecRestrictedSQL(ctx context.Context, opts []sqlexec.OptionFu
if err != nil {
return nil, nil, err
}
metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal).Observe(time.Since(startTime).Seconds())

var dbName string
if config.GetGlobalConfig().Status.RecordDBLabel {
dbName = se.GetSessionVars().CurrentDB
}
metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal, dbName).Observe(time.Since(startTime).Seconds())
return rows, rs.Fields(), err
})
}
Expand Down

0 comments on commit 79558c0

Please sign in to comment.