Skip to content

Commit

Permalink
*: fix bug of profiling conflict between TopSQL and pprof profile HTT…
Browse files Browse the repository at this point in the history
…P API (#30891)

close #30890
  • Loading branch information
crazycs520 authored Dec 31, 2021
1 parent 48fce5e commit 25d6301
Show file tree
Hide file tree
Showing 25 changed files with 1,252 additions and 813 deletions.
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TopSQLReportDurationHistogram)
prometheus.MustRegister(TopSQLReportDataHistogram)
prometheus.MustRegister(PDApiExecutionHistogram)
prometheus.MustRegister(CPUProfileCounter)

tikvmetrics.InitMetrics(TiDB, TiKVClient)
tikvmetrics.RegisterMetrics()
Expand Down
8 changes: 8 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ var (
Help: "Bucketed histogram of all pd api execution time (s)",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s
}, []string{LblType})

CPUProfileCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "cpu_profile_total",
Help: "Counter of cpu profiling",
})
)

// ExecuteErrorToLabel converts an execute error to label.
Expand Down
12 changes: 7 additions & 5 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/cpuprofile"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/printer"
"github.com/pingcap/tidb/util/topsql/tracecpu"
"github.com/pingcap/tidb/util/versioninfo"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *Server) startHTTPServer() {

serverMux.HandleFunc("/debug/pprof/", pprof.Index)
serverMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
serverMux.HandleFunc("/debug/pprof/profile", tracecpu.ProfileHTTPHandler)
serverMux.HandleFunc("/debug/pprof/profile", cpuprofile.ProfileHTTPHandler)
serverMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
serverMux.HandleFunc("/debug/pprof/trace", pprof.Trace)

Expand Down Expand Up @@ -355,7 +355,8 @@ func (s *Server) startHTTPServer() {
serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "profile", err))
return
}
if err := tracecpu.StartCPUProfile(fw); err != nil {
pc := cpuprofile.NewCollector()
if err := pc.StartCPUProfile(fw); err != nil {
serveError(w, http.StatusInternalServerError,
fmt.Sprintf("Could not enable CPU profiling: %s", err))
return
Expand All @@ -365,9 +366,10 @@ func (s *Server) startHTTPServer() {
sec = 10
}
sleepWithCtx(r.Context(), time.Duration(sec)*time.Second)
err = tracecpu.StopCPUProfile()
err = pc.StopCPUProfile()
if err != nil {
serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "config", err))
serveError(w, http.StatusInternalServerError,
fmt.Sprintf("Could not enable CPU profiling: %s", err))
return
}

Expand Down
1 change: 0 additions & 1 deletion server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/util/topsql/tracecpu.(*sqlCPUProfiler).startAnalyzeProfileWorker"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
202 changes: 23 additions & 179 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/cpuprofile"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/topsql/reporter"
mockTopSQLReporter "github.com/pingcap/tidb/util/topsql/reporter/mock"
"github.com/pingcap/tidb/util/topsql"
"github.com/pingcap/tidb/util/topsql/collector"
mockTopSQLTraceCPU "github.com/pingcap/tidb/util/topsql/collector/mock"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tidb/util/topsql/tracecpu"
mockTopSQLTraceCPU "github.com/pingcap/tidb/util/topsql/tracecpu/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -134,9 +134,13 @@ func createTidbTestTopSQLSuite(t *testing.T) (*tidbTestTopSQLSuite, func()) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

tracecpu.GlobalSQLCPUProfiler.Run()

return ts, cleanup
err = cpuprofile.StartCPUProfiler()
require.NoError(t, err)
cleanFn := func() {
cleanup()
cpuprofile.StopCPUProfiler()
}
return ts, cleanFn
}

func TestRegression(t *testing.T) {
Expand Down Expand Up @@ -1261,10 +1265,6 @@ func TestPessimisticInsertSelectForUpdate(t *testing.T) {
require.Nil(t, rs) // should be no delay
}

type collectorWrapper struct {
reporter.TopSQLReporter
}

func TestTopSQLCPUProfile(t *testing.T) {
ts, cleanup := createTidbTestTopSQLSuite(t)
defer cleanup()
Expand All @@ -1285,8 +1285,12 @@ func TestTopSQLCPUProfile(t *testing.T) {
require.NoError(t, err)
}()

collector := mockTopSQLTraceCPU.NewTopSQLCollector()
tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{collector})
topsqlstate.EnableTopSQL()
mc := mockTopSQLTraceCPU.NewTopSQLCollector()
topsql.SetupTopSQLForTest(mc)
sqlCPUCollector := collector.NewSQLCPUCollector(mc)
sqlCPUCollector.Start()
defer sqlCPUCollector.Stop()

dbt := testkit.NewDBTestKit(t, db)
dbt.MustExec("drop database if exists topsql")
Expand Down Expand Up @@ -1340,13 +1344,13 @@ func TestTopSQLCPUProfile(t *testing.T) {
defer cancel()
checkFn := func(sql, planRegexp string) {
require.NoError(t, timeoutCtx.Err())
stats := collector.GetSQLStatsBySQLWithRetry(sql, len(planRegexp) > 0)
stats := mc.GetSQLStatsBySQLWithRetry(sql, len(planRegexp) > 0)
// since 1 sql may has many plan, check `len(stats) > 0` instead of `len(stats) == 1`.
require.Greaterf(t, len(stats), 0, "sql: %v", sql)

for _, s := range stats {
sqlStr := collector.GetSQL(s.SQLDigest)
encodedPlan := collector.GetPlan(s.PlanDigest)
sqlStr := mc.GetSQL(s.SQLDigest)
encodedPlan := mc.GetPlan(s.PlanDigest)
// Normalize the user SQL before check.
normalizedSQL := parser.Normalize(sql)
require.Equalf(t, normalizedSQL, sqlStr, "sql: %v", sql)
Expand All @@ -1360,7 +1364,7 @@ func TestTopSQLCPUProfile(t *testing.T) {
}
}
// Wait the top sql collector to collect profile data.
collector.WaitCollectCnt(1)
mc.WaitCollectCnt(1)
// Check result of test case 1.
for _, ca := range cases1 {
checkFn(ca.sql, ca.planRegexp)
Expand Down Expand Up @@ -1408,7 +1412,7 @@ func TestTopSQLCPUProfile(t *testing.T) {
})
}
// Wait the top sql collector to collect profile data.
collector.WaitCollectCnt(1)
mc.WaitCollectCnt(1)
// Check result of test case 2.
for _, ca := range cases2 {
checkFn(ca.prepare, ca.planRegexp)
Expand Down Expand Up @@ -1470,7 +1474,7 @@ func TestTopSQLCPUProfile(t *testing.T) {
}

// Wait the top sql collector to collect profile data.
collector.WaitCollectCnt(1)
mc.WaitCollectCnt(1)
// Check result of test case 3.
for _, ca := range cases3 {
checkFn(ca.prepare, ca.planRegexp)
Expand Down Expand Up @@ -1772,166 +1776,6 @@ func TestTopSQLStatementStats(t *testing.T) {
require.Equal(t, 20, found)
}

func TestTopSQLAgent(t *testing.T) {
t.Skip("unstable, skip it and fix it before 20210702")

ts, cleanup := createTidbTestTopSQLSuite(t)
defer cleanup()
db, err := sql.Open("mysql", ts.getDSN())
require.NoError(t, err, "Error connecting")
defer func() {
err := db.Close()
require.NoError(t, err)
}()
agentServer, err := mockTopSQLReporter.StartMockAgentServer()
require.NoError(t, err)
defer func() {
agentServer.Stop()
}()

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/reporter/resetTimeoutForTest", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL", `return(true)`))
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/util/topsql/reporter/resetTimeoutForTest")
require.NoError(t, err)
err = failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")
require.NoError(t, err)
err = failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL")
require.NoError(t, err)
}()

dbt := testkit.NewDBTestKit(t, db)
dbt.MustExec("drop database if exists topsql")
dbt.MustExec("create database topsql")
dbt.MustExec("use topsql;")
for i := 0; i < 20; i++ {
dbt.MustExec(fmt.Sprintf("create table t%v (a int auto_increment, b int, unique index idx(a));", i))
for j := 0; j < 100; j++ {
dbt.MustExec(fmt.Sprintf("insert into t%v (b) values (%v);", i, j))
}
}
setTopSQLReceiverAddress := func(addr string) {
config.UpdateGlobal(func(conf *config.Config) {
conf.TopSQL.ReceiverAddress = addr
})
}
dbt.MustExec("set @@global.tidb_enable_top_sql='On';")
setTopSQLReceiverAddress("")
dbt.MustExec("set @@global.tidb_top_sql_precision_seconds=1;")
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan)
s := reporter.NewSingleTargetDataSink(r)
defer func() {
r.Close()
s.Close()
}()

tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
checkFn := func(n int) {
records := agentServer.GetLatestRecords()
require.Len(t, records, n)
for _, r := range records {
sqlMeta, exist := agentServer.GetSQLMetaByDigestBlocking(r.SqlDigest, time.Second)
require.True(t, exist)
require.Regexp(t, "^select.*from.*join", sqlMeta.NormalizedSql)
if len(r.PlanDigest) == 0 {
continue
}
plan, exist := agentServer.GetPlanMetaByDigestBlocking(r.PlanDigest, time.Second)
require.True(t, exist)
plan = strings.Replace(plan, "\n", " ", -1)
plan = strings.Replace(plan, "\t", " ", -1)
require.Regexp(t, "Join.*Select", plan)
}
}
runWorkload := func(start, end int) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
for i := start; i < end; i++ {
query := fmt.Sprintf("select /*+ HASH_JOIN(ta, tb) */ * from t%[1]v ta join t%[1]v tb on ta.a=tb.a where ta.b is not null;", i)
go ts.loopExec(ctx, t, func(db *sql.DB) {
dbt := testkit.NewDBTestKit(t, db)
rows := dbt.MustQuery(query)
require.NoError(t, rows.Close())
})
}
return cancel
}

// case 1: dynamically change agent endpoint
cancel := runWorkload(0, 10)
// Test with null agent address, the agent server can't receive any record.
setTopSQLReceiverAddress("")
agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(0)
// Test after set agent address and the evict take effect.
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")
setTopSQLReceiverAddress(agentServer.Address())
agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(5)
// Test with wrong agent address, the agent server can't receive any record.
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=8;")
setTopSQLReceiverAddress("127.0.0.1:65530")

agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(0)
// Test after set agent address and the evict take effect.
setTopSQLReceiverAddress(agentServer.Address())
agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(8)
cancel() // cancel case 1

// case 2: agent hangs for a while
cancel2 := runWorkload(0, 10)
// empty agent address, should not collect records
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")
setTopSQLReceiverAddress("")
agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(0)
// set correct address, should collect records
setTopSQLReceiverAddress(agentServer.Address())
agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(5)
// agent server hangs for a while
agentServer.HangFromNow(time.Second * 6)
// run another set of SQL queries
cancel2()

cancel3 := runWorkload(11, 20)
agentServer.WaitCollectCnt(1, time.Second*8)
checkFn(5)
cancel3()

// case 3: agent restart
cancel4 := runWorkload(0, 10)
// empty agent address, should not collect records
setTopSQLReceiverAddress("")
agentServer.WaitCollectCnt(1, time.Second*4)
checkFn(0)
// set correct address, should collect records
setTopSQLReceiverAddress(agentServer.Address())
agentServer.WaitCollectCnt(1, time.Second*8)
checkFn(5)
// run another set of SQL queries
cancel4()

cancel5 := runWorkload(11, 20)
// agent server shutdown
agentServer.Stop()
// agent server restart
agentServer, err = mockTopSQLReporter.StartMockAgentServer()
require.NoError(t, err)
setTopSQLReceiverAddress(agentServer.Address())
// check result
agentServer.WaitCollectCnt(2, time.Second*8)
checkFn(5)
cancel5()
}

func (ts *tidbTestTopSQLSuite) loopExec(ctx context.Context, t *testing.T, fn func(db *sql.DB)) {
db, err := sql.Open("mysql", ts.getDSN())
require.NoError(t, err, "Error connecting")
Expand Down
5 changes: 5 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/cpuprofile"
"github.com/pingcap/tidb/util/deadlockhistory"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -180,6 +181,9 @@ func main() {
terror.MustNil(err)
checkTempStorageQuota()
}
err := cpuprofile.StartCPUProfiler()
terror.MustNil(err)

// Enable failpoints in tikv/client-go if the test API is enabled.
// It appears in the main function to be set before any use of client-go to prevent data race.
if _, err := failpoint.Status("github.com/pingcap/tidb/server/enableTestAPI"); err == nil {
Expand Down Expand Up @@ -207,6 +211,7 @@ func main() {
signal.SetupSignalHandler(func(graceful bool) {
svr.Close()
cleanup(svr, storage, dom, graceful)
cpuprofile.StopCPUProfiler()
close(exited)
})
topsql.SetupTopSQL()
Expand Down
Loading

0 comments on commit 25d6301

Please sign in to comment.