diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index a70ab87862fc..e32d3d66af5f 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -32,6 +32,8 @@ go_library( "//pkg/sql/appstatspb", "//pkg/sql/catalog/systemschema", "//pkg/sql/isql", + "//pkg/sql/parser", + "//pkg/sql/parser/statements", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 5ef3350f1cb4..69db7a99b372 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -34,7 +34,6 @@ import ( // 3. We have reached the limit of the number of rows in the system table. func (s *PersistedSQLStats) MaybeFlush(ctx context.Context, stopper *stop.Stopper) bool { now := s.getTimeNow() - allowDiscardWhenDisabled := DiscardInMemoryStatsWhenFlushDisabled.Get(&s.cfg.Settings.SV) minimumFlushInterval := MinimumInterval.Get(&s.cfg.Settings.SV) @@ -255,15 +254,6 @@ func (s *PersistedSQLStats) upsertTransactionStats( serializedFingerprintID []byte, stats *appstatspb.CollectedTransactionStatistics, ) error { - const upsertStmt = ` -INSERT INTO system.transaction_statistics as t -VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id) -DO UPDATE -SET - statistics = crdb_internal.merge_transaction_stats(ARRAY(t.statistics, EXCLUDED.statistics)) -` - aggInterval := s.GetAggregationInterval() // Prepare data for insertion. @@ -280,12 +270,12 @@ SET statistics := tree.NewDJSON(statisticsJSON) nodeID := s.GetEnabledSQLInstanceID() - _, err = txn.ExecEx( + _, err = txn.ExecParsed( ctx, "upsert-txn-stats", txn.KV(), sessiondata.NodeUserWithLowUserPrioritySessionDataOverride, - upsertStmt, + s.upsertTxnStatsStmt, aggregatedTs, // aggregated_ts serializedFingerprintID, // fingerprint_id stats.App, // app_name @@ -346,22 +336,12 @@ func (s *PersistedSQLStats) upsertStatementStats( indexRecommendations, // index_recommendations ) - const upsertStmt = ` -INSERT INTO system.statement_statistics as s -VALUES ($1 ,$2, $3, $4, $5, $6, $7, $8, $9, $10, $11) -ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, - aggregated_ts, fingerprint_id, transaction_fingerprint_id, app_name, plan_hash, node_id) -DO UPDATE -SET - statistics = crdb_internal.merge_statement_stats(ARRAY(s.statistics, EXCLUDED.statistics)), - index_recommendations = EXCLUDED.index_recommendations -` - _, err = txn.ExecEx( + _, err = txn.ExecParsed( ctx, "upsert-stmt-stats", txn.KV(), /* txn */ sessiondata.NodeUserWithLowUserPrioritySessionDataOverride, - upsertStmt, + s.upsertStmtStatsStmt, args..., ) diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go index 3cd92a3bb120..192d1b15b3ed 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go @@ -1288,3 +1288,37 @@ func TestSQLStatsFlushWorkerDoesntSignalJobOnAbort(t *testing.T) { default: } } + +func BenchmarkSQLStatsFlush(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + fakeTime := stubTime{ + aggInterval: time.Hour, + } + fakeTime.setTime(timeutil.Now()) + ts, conn, _ := serverutils.StartServer(b, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLStatsKnobs: &sqlstats.TestingKnobs{ + StubTimeNow: fakeTime.Now, + }, + }, + }, + ) + defer ts.Stop(context.Background()) + + sqlStats := ts.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) + runner := sqlutils.MakeSQLRunner(conn) + + ctx := context.Background() + const QueryCountScale = int64(5000) + for iter := 0; iter < b.N; iter++ { + for _, tc := range testQueries { + for i := int64(0); i < QueryCountScale; i++ { + runner.Exec(b, tc.query) + } + } + b.StartTimer() + sqlStats.MaybeFlush(ctx, ts.Stopper()) + b.StartTimer() + } +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index a4b75f6e1755..ddaec6f6a72c 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -20,6 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/parser/statements" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -81,6 +84,9 @@ type PersistedSQLStats struct { // The last time the size was checked before doing a flush. lastSizeCheck time.Time + + upsertTxnStatsStmt statements.Statement[tree.Statement] + upsertStmtStatsStmt statements.Statement[tree.Statement] } var _ sqlstats.Provider = &PersistedSQLStats{} @@ -104,6 +110,34 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats { p.jobMonitor.testingKnobs.updateCheckInterval = cfg.Knobs.JobMonitorUpdateCheckInterval } + upsertTxnStatsStmt, err := parser.ParseOne(` +INSERT INTO system.transaction_statistics as t +VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id) +DO UPDATE +SET + statistics = crdb_internal.merge_transaction_stats(ARRAY(t.statistics, EXCLUDED.statistics)) +`) + if err != nil { + panic(err) + } + p.upsertTxnStatsStmt = upsertTxnStatsStmt + + upsertStmtStatsStmt, err := parser.ParseOne(` +INSERT INTO system.statement_statistics as s +VALUES ($1 ,$2, $3, $4, $5, $6, $7, $8, $9, $10, $11) +ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8, + aggregated_ts, fingerprint_id, transaction_fingerprint_id, app_name, plan_hash, node_id) +DO UPDATE +SET + statistics = crdb_internal.merge_statement_stats(ARRAY(s.statistics, EXCLUDED.statistics)), + index_recommendations = EXCLUDED.index_recommendations +`) + if err != nil { + panic(err) + } + p.upsertStmtStatsStmt = upsertStmtStatsStmt + return p }