diff --git a/server/rpc_server.go b/server/rpc_server.go index 674047781a6bd..3b23539c0bac1 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -64,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana } diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv) tikvpb.RegisterTikvServer(s, rpcSrv) + topsql.RegisterPubSubServer(s) return s } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ee01348a76441..ccec9e1b5a8fc 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -832,5 +832,5 @@ type TopSQL struct { // TopSQLEnabled uses to check whether enabled the top SQL feature. func TopSQLEnabled() bool { - return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != "" + return TopSQLVariable.Enable.Load() } diff --git a/util/topsql/reporter/mock/pubsub.go b/util/topsql/reporter/mock/pubsub.go new file mode 100644 index 0000000000000..493d95c17f827 --- /dev/null +++ b/util/topsql/reporter/mock/pubsub.go @@ -0,0 +1,67 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "fmt" + "net" + + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type mockPubSubServer struct { + addr string + listen net.Listener + grpcServer *grpc.Server +} + +// NewMockPubSubServer creates a mock publisher server. +func NewMockPubSubServer() (*mockPubSubServer, error) { + addr := "127.0.0.1:0" + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + server := grpc.NewServer() + + return &mockPubSubServer{ + addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port), + listen: lis, + grpcServer: server, + }, nil +} + +func (svr *mockPubSubServer) Serve() { + err := svr.grpcServer.Serve(svr.listen) + if err != nil { + logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err)) + } +} + +func (svr *mockPubSubServer) Server() *grpc.Server { + return svr.grpcServer +} + +func (svr *mockPubSubServer) Address() string { + return svr.addr +} + +func (svr *mockPubSubServer) Stop() { + if svr.grpcServer != nil { + svr.grpcServer.Stop() + } +} diff --git a/util/topsql/reporter/pubsub.go b/util/topsql/reporter/pubsub.go new file mode 100644 index 0000000000000..7d01c077e058f --- /dev/null +++ b/util/topsql/reporter/pubsub.go @@ -0,0 +1,267 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "context" + "errors" + "time" + + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" +) + +// TopSQLPubSubService implements tipb.TopSQLPubSubServer. +// +// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible +// for registering an associated DataSink to the reporter. Then the DataSink sends +// data to the client periodically. +type TopSQLPubSubService struct { + dataSinkRegisterer DataSinkRegisterer +} + +// NewTopSQLPubSubService creates a new TopSQLPubSubService. +func NewTopSQLPubSubService(dataSinkRegisterer DataSinkRegisterer) *TopSQLPubSubService { + return &TopSQLPubSubService{dataSinkRegisterer: dataSinkRegisterer} +} + +var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{} + +// Subscribe registers dataSinks to the reporter and redirects data received from reporter +// to subscribers associated with those dataSinks. +func (ps *TopSQLPubSubService) Subscribe(_ *tipb.TopSQLSubRequest, stream tipb.TopSQLPubSub_SubscribeServer) error { + ds := newPubSubDataSink(stream, ps.dataSinkRegisterer) + if err := ps.dataSinkRegisterer.Register(ds); err != nil { + return err + } + return ds.run() +} + +type pubSubDataSink struct { + ctx context.Context + cancel context.CancelFunc + + stream tipb.TopSQLPubSub_SubscribeServer + sendTaskCh chan sendTask + + // for deregister + registerer DataSinkRegisterer +} + +func newPubSubDataSink(stream tipb.TopSQLPubSub_SubscribeServer, registerer DataSinkRegisterer) *pubSubDataSink { + ctx, cancel := context.WithCancel(stream.Context()) + + return &pubSubDataSink{ + ctx: ctx, + cancel: cancel, + + stream: stream, + sendTaskCh: make(chan sendTask, 1), + + registerer: registerer, + } +} + +var _ DataSink = &pubSubDataSink{} + +func (ds *pubSubDataSink) TrySend(data *ReportData, deadline time.Time) error { + select { + case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}: + return nil + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + ignoreReportChannelFullCounter.Inc() + return errors.New("the channel of pubsub dataSink is full") + } +} + +func (ds *pubSubDataSink) OnReporterClosing() { + ds.cancel() +} + +func (ds *pubSubDataSink) run() error { + defer func() { + ds.registerer.Deregister(ds) + ds.cancel() + }() + + for { + select { + case task := <-ds.sendTaskCh: + ctx, cancel := context.WithDeadline(ds.ctx, task.deadline) + var err error + + start := time.Now() + go util.WithRecovery(func() { + defer cancel() + err = ds.doSend(ctx, task.data) + + if err != nil { + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }, nil) + + // When the deadline is exceeded, the closure inside `go util.WithRecovery` above may not notice that + // immediately because it can be blocked by `stream.Send`. + // In order to clean up resources as quickly as possible, we let that closure run in an individual goroutine, + // and wait for timeout here. + <-ctx.Done() + + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded", + zap.Time("deadline", task.deadline), + ) + return ctx.Err() + } + + if err != nil { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber", + zap.Error(err), + ) + return err + } + case <-ds.ctx.Done(): + return ds.ctx.Err() + } + } +} + +func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error { + if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil { + return err + } + if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil { + return err + } + return ds.sendPlanMeta(ctx, data.PlanMetas) +} + +func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) { + if len(records) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) + if err != nil { + reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + cpuRecord := &tipb.TopSQLSubResponse_Record{} + r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord} + + for i := range records { + cpuRecord.Record = &records[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (ds *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) { + if len(sqlMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportSQLCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta} + + for i := range sqlMetas { + sqlMeta.SqlMeta = &sqlMetas[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (ds *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) { + if len(planMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportPlanCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + planMeta := &tipb.TopSQLSubResponse_PlanMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: planMeta} + + for i := range planMetas { + planMeta.PlanMeta = &planMetas[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 3ea61d75f633a..3744702ba26d6 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -38,7 +39,7 @@ type SingleTargetDataSink struct { conn *grpc.ClientConn sendTaskCh chan sendTask - registered bool + registered *atomic.Bool registerer DataSinkRegisterer } @@ -53,7 +54,7 @@ func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSin conn: nil, sendTaskCh: make(chan sendTask, 1), - registered: false, + registered: atomic.NewBool(false), registerer: registerer, } @@ -64,6 +65,7 @@ func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSin logutil.BgLogger().Warn("failed to register single target datasink", zap.Error(err)) return nil } + dataSink.registered.Store(true) } go dataSink.recoverRun() @@ -111,25 +113,27 @@ func (ds *SingleTargetDataSink) run() (rerun bool) { targetRPCAddr = config.GetGlobalConfig().TopSQL.ReceiverAddress } - if err := ds.tryRegister(targetRPCAddr); err != nil { - logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) + if err := ds.trySwitchRegistration(targetRPCAddr); err != nil { return false } } } -func (ds *SingleTargetDataSink) tryRegister(addr string) error { - if addr == "" && ds.registered { +func (ds *SingleTargetDataSink) trySwitchRegistration(addr string) error { + // deregister if `addr` is empty and registered before + if addr == "" && ds.registered.Load() { ds.registerer.Deregister(ds) - ds.registered = false + ds.registered.Store(false) return nil } - if addr != "" && !ds.registered { + // register if `add` is not empty and not registered before + if addr != "" && !ds.registered.Load() { if err := ds.registerer.Register(ds); err != nil { + logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) return err } - ds.registered = true + ds.registered.Store(true) } return nil } @@ -160,8 +164,9 @@ func (ds *SingleTargetDataSink) OnReporterClosing() { func (ds *SingleTargetDataSink) Close() { ds.cancel() - if ds.registered { + if ds.registered.Load() { ds.registerer.Deregister(ds) + ds.registered.Store(false) } } @@ -172,12 +177,14 @@ func (ds *SingleTargetDataSink) doSend(addr string, task sendTask) { var err error start := time.Now() - if err != nil { - logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) - reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) - } else { - reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) - } + defer func() { + if err != nil { + logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() ctx, cancel := context.WithDeadline(context.Background(), task.deadline) defer cancel() @@ -223,8 +230,9 @@ func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, reco topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) if err != nil { reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) @@ -254,11 +262,11 @@ func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas [ sentCount := 0 defer func() { topSQLReportSQLCountHistogram.Observe(float64(sentCount)) - if err != nil { reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) @@ -291,8 +299,9 @@ func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas topSQLReportPlanCountHistogram.Observe(float64(sentCount)) if err != nil { reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 12a9da430e7c0..ccc1f61eef0a5 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -27,7 +27,9 @@ import ( "github.com/pingcap/tidb/util/topsql/reporter" "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" + "google.golang.org/grpc" ) const ( @@ -38,7 +40,7 @@ const ( ) var ( - globalTopSQLReport reporter.TopSQLReporter + globalTopSQLReport *reporter.RemoteTopSQLReporter singleTargetDataSink *reporter.SingleTargetDataSink ) @@ -54,6 +56,14 @@ func SetupTopSQL() { stmtstats.SetupAggregator() } +// RegisterPubSubServer registers TopSQLPubSubService to the given gRPC server. +func RegisterPubSubServer(s *grpc.Server) { + if globalTopSQLReport != nil { + service := reporter.NewTopSQLPubSubService(globalTopSQLReport) + tipb.RegisterTopSQLPubSubServer(s, service) + } +} + // Close uses to close and release the top sql resource. func Close() { if singleTargetDataSink != nil { diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index d4aabc746ce0e..462c6ffb70aa2 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -29,7 +29,10 @@ import ( mockServer "github.com/pingcap/tidb/util/topsql/reporter/mock" "github.com/pingcap/tidb/util/topsql/tracecpu" "github.com/pingcap/tidb/util/topsql/tracecpu/mock" + "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type collectorWrapper struct { @@ -213,6 +216,163 @@ func TestMaxSQLAndPlanTest(t *testing.T) { require.Empty(t, cPlan) } +func TestTopSQLPubSub(t *testing.T) { + variable.TopSQLVariable.MaxStatementCount.Store(200) + variable.TopSQLVariable.ReportIntervalSeconds.Store(1) + + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + defer report.Close() + tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report}) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + pubsubService := reporter.NewTopSQLPubSubService(report) + tipb.RegisterTopSQLPubSubServer(server.Server(), pubsubService) + go server.Serve() + defer server.Stop() + + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + reqs := []struct { + sql string + plan string + }{ + {"select * from t where a=?", "point-get"}, + {"select * from t where a>?", "table-scan"}, + {"insert into t values (?)", ""}, + } + + digest2sql := make(map[string]string) + sql2plan := make(map[string]string) + for _, req := range reqs { + sql2plan[req.sql] = req.plan + sqlDigest := mock.GenSQLDigest(req.sql) + digest2sql[string(sqlDigest.Bytes())] = req.sql + + go func(sql, plan string) { + for { + select { + case <-ctx.Done(): + return + default: + mockExecuteSQL(sql, plan) + } + } + }(req.sql, req.plan) + } + + sqlMetas := make(map[string]*tipb.SQLMeta) + planMetas := make(map[string]string) + records := make(map[string]*tipb.CPUTimeRecord) + + for { + r, err := stream.Recv() + if err != nil { + break + } + + if r.GetRecord() != nil { + rec := r.GetRecord() + if _, ok := records[string(rec.SqlDigest)]; !ok { + records[string(rec.SqlDigest)] = rec + } else { + cpu := records[string(rec.SqlDigest)] + if rec.PlanDigest != nil { + cpu.PlanDigest = rec.PlanDigest + } + cpu.RecordListTimestampSec = append(cpu.RecordListTimestampSec, rec.RecordListTimestampSec...) + cpu.RecordListCpuTimeMs = append(cpu.RecordListCpuTimeMs, rec.RecordListCpuTimeMs...) + } + } else if r.GetSqlMeta() != nil { + sql := r.GetSqlMeta() + if _, ok := sqlMetas[string(sql.SqlDigest)]; !ok { + sqlMetas[string(sql.SqlDigest)] = sql + } + } else if r.GetPlanMeta() != nil { + plan := r.GetPlanMeta() + if _, ok := planMetas[string(plan.PlanDigest)]; !ok { + planMetas[string(plan.PlanDigest)] = plan.NormalizedPlan + } + } + } + + checkSQLPlanMap := map[string]struct{}{} + for i := range records { + record := records[i] + require.Greater(t, len(record.RecordListCpuTimeMs), 0) + require.Greater(t, record.RecordListCpuTimeMs[0], uint32(0)) + sqlMeta, exist := sqlMetas[string(record.SqlDigest)] + require.True(t, exist) + expectedNormalizedSQL, exist := digest2sql[string(record.SqlDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedSQL, sqlMeta.NormalizedSql) + + expectedNormalizedPlan := sql2plan[expectedNormalizedSQL] + if expectedNormalizedPlan == "" || len(record.PlanDigest) == 0 { + require.Equal(t, len(record.PlanDigest), 0) + continue + } + normalizedPlan, exist := planMetas[string(record.PlanDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedPlan, normalizedPlan) + checkSQLPlanMap[expectedNormalizedSQL] = struct{}{} + } + require.Equal(t, len(checkSQLPlanMap), 2) +} + +func TestPubSubWhenReporterIsStopped(t *testing.T) { + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + + pubsubService := reporter.NewTopSQLPubSubService(report) + tipb.RegisterTopSQLPubSubServer(server.Server(), pubsubService) + go server.Serve() + defer server.Stop() + + // stop reporter first + report.Close() + + // try to subscribe + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err, "reporter is closed") +} + func setTopSQLEnable(enabled bool) { variable.TopSQLVariable.Enable.Store(enabled) }