diff --git a/collector/collector.go b/collector/collector.go index d50e1e72a..c1bf2af9a 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "errors" "fmt" "sync" @@ -59,7 +58,7 @@ var ( ) type Collector interface { - Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error + Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error } type collectorConfig struct { @@ -92,7 +91,7 @@ type PostgresCollector struct { Collectors map[string]Collector logger log.Logger - db *sql.DB + instance *instance } type Option func(*PostgresCollector) error @@ -149,14 +148,11 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri return nil, errors.New("empty dsn") } - db, err := sql.Open("postgres", dsn) + instance, err := newInstance(dsn) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - - p.db = db + p.instance = instance return p, nil } @@ -174,16 +170,16 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(p.Collectors)) for name, c := range p.Collectors { go func(name string, c Collector) { - execute(ctx, name, c, p.db, ch, p.logger) + execute(ctx, name, c, p.instance, ch, p.logger) wg.Done() }(name, c) } wg.Wait() } -func execute(ctx context.Context, name string, c Collector, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) { +func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) { begin := time.Now() - err := c.Update(ctx, db, ch) + err := c.Update(ctx, instance, ch) duration := time.Since(begin) var success float64 diff --git a/collector/instance.go b/collector/instance.go new file mode 100644 index 000000000..9b2bbf47f --- /dev/null +++ b/collector/instance.go @@ -0,0 +1,85 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "database/sql" + "fmt" + "regexp" + + "github.com/blang/semver/v4" +) + +type instance struct { + db *sql.DB + version semver.Version +} + +func newInstance(dsn string) (*instance, error) { + i := &instance{} + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + i.db = db + + version, err := queryVersion(db) + if err != nil { + db.Close() + return nil, err + } + + i.version = version + + return i, nil +} + +func (i *instance) getDB() *sql.DB { + return i.db +} + +func (i *instance) Close() error { + return i.db.Close() +} + +// Regex used to get the "short-version" from the postgres version field. +// The result of SELECT version() is something like "PostgreSQL 9.6.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 6.2.1 20160830, 64-bit" +var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`) +var serverVersionRegex = regexp.MustCompile(`^((\d+)(\.\d+)?(\.\d+)?)`) + +func queryVersion(db *sql.DB) (semver.Version, error) { + var version string + err := db.QueryRow("SELECT version();").Scan(&version) + if err != nil { + return semver.Version{}, err + } + submatches := versionRegex.FindStringSubmatch(version) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + + // We could also try to parse the version from the server_version field. + // This is of the format 13.3 (Debian 13.3-1.pgdg100+1) + err = db.QueryRow("SHOW server_version;").Scan(&version) + if err != nil { + return semver.Version{}, err + } + submatches = serverVersionRegex.FindStringSubmatch(version) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + return semver.Version{}, fmt.Errorf("could not parse version from %q", version) +} diff --git a/collector/pg_database.go b/collector/pg_database.go index 661f84cd8..a4ea50d0d 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -66,7 +65,8 @@ var ( // each database individually. This is because we can't filter the // list of databases in the query because the list of excluded // databases is dynamic. -func (c PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() // Query the list of databases rows, err := db.QueryContext(ctx, pgDatabaseQuery, diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index bb108bb86..058a6d252 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -29,6 +29,8 @@ func TestPGDatabaseCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}). AddRow("postgres")) @@ -39,7 +41,7 @@ func TestPGDatabaseCollector(t *testing.T) { go func() { defer close(ch) c := PGDatabaseCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGDatabaseCollector.Update: %s", err) } }() diff --git a/collector/pg_postmaster.go b/collector/pg_postmaster.go index 4a0cec6d4..eae82d567 100644 --- a/collector/pg_postmaster.go +++ b/collector/pg_postmaster.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/prometheus/client_golang/prometheus" ) @@ -47,7 +46,8 @@ var ( pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();" ) -func (c *PGPostmasterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, pgPostmasterQuery) diff --git a/collector/pg_postmaster_test.go b/collector/pg_postmaster_test.go index 9b93a5c91..c40fe03ad 100644 --- a/collector/pg_postmaster_test.go +++ b/collector/pg_postmaster_test.go @@ -29,6 +29,8 @@ func TestPgPostmasterCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). AddRow(1685739904)) @@ -37,7 +39,7 @@ func TestPgPostmasterCollector(t *testing.T) { defer close(ch) c := PGPostmasterCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) } }() diff --git a/collector/pg_process_idle.go b/collector/pg_process_idle.go index 8ee65a436..06244975b 100644 --- a/collector/pg_process_idle.go +++ b/collector/pg_process_idle.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -42,7 +41,8 @@ var pgProcessIdleSeconds = prometheus.NewDesc( prometheus.Labels{}, ) -func (PGProcessIdleCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, `WITH metrics AS ( diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 8f105ff49..4278923f8 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -73,7 +72,8 @@ var ( pg_replication_slots;` ) -func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, pgReplicationSlotQuery) if err != nil { diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 53bafafad..cb25b755a 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -29,6 +29,8 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). AddRow("test_slot", 5, 3, true) @@ -39,7 +41,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { defer close(ch) c := PGReplicationSlotCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) } }() @@ -68,6 +70,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). AddRow("test_slot", 6, 12, false) @@ -78,7 +82,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { defer close(ch) c := PGReplicationSlotCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_bgwriter.go b/collector/pg_stat_bgwriter.go index 5daf606c9..2bdef8d40 100644 --- a/collector/pg_stat_bgwriter.go +++ b/collector/pg_stat_bgwriter.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "time" "github.com/prometheus/client_golang/prometheus" @@ -117,7 +116,8 @@ var ( FROM pg_stat_bgwriter;` ) -func (PGStatBGWriterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, statBGWriterQuery) diff --git a/collector/pg_stat_bgwriter_test.go b/collector/pg_stat_bgwriter_test.go index 54f625c9e..11f55f6be 100644 --- a/collector/pg_stat_bgwriter_test.go +++ b/collector/pg_stat_bgwriter_test.go @@ -30,6 +30,8 @@ func TestPGStatBGWriterCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{ "checkpoints_timed", "checkpoints_req", @@ -57,7 +59,7 @@ func TestPGStatBGWriterCollector(t *testing.T) { defer close(ch) c := PGStatBGWriterCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index 346ed9ea9..bb39a84b1 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -204,7 +204,8 @@ var ( ) ) -func (PGStatDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, `SELECT datid diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index 23e1f1567..eb629c381 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -92,7 +91,8 @@ var ( LIMIT 100;` ) -func (PGStatStatementsCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, pgStatStatementsQuery) diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index a5c5cab57..241699ad4 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -29,6 +29,8 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) @@ -39,7 +41,7 @@ func TestPGStateStatementsCollector(t *testing.T) { defer close(ch) c := PGStatStatementsCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_user_tables.go b/collector/pg_stat_user_tables.go index 05aced91f..48ae96eb8 100644 --- a/collector/pg_stat_user_tables.go +++ b/collector/pg_stat_user_tables.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "time" "github.com/go-kit/log" @@ -179,7 +178,8 @@ var ( pg_stat_user_tables` ) -func (c *PGStatUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, statUserTablesQuery) diff --git a/collector/pg_stat_user_tables_test.go b/collector/pg_stat_user_tables_test.go index 29b5d15f1..8bb9bc31b 100644 --- a/collector/pg_stat_user_tables_test.go +++ b/collector/pg_stat_user_tables_test.go @@ -30,6 +30,8 @@ func TestPGStatUserTablesCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + lastVacuumTime, err := time.Parse("2006-01-02Z", "2023-06-02Z") if err != nil { t.Fatalf("Error parsing vacuum time: %s", err) @@ -99,7 +101,7 @@ func TestPGStatUserTablesCollector(t *testing.T) { defer close(ch) c := PGStatUserTablesCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err) } }() diff --git a/collector/pg_statio_user_tables.go b/collector/pg_statio_user_tables.go index 043433d86..03d541615 100644 --- a/collector/pg_statio_user_tables.go +++ b/collector/pg_statio_user_tables.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -100,7 +99,8 @@ var ( FROM pg_statio_user_tables` ) -func (PGStatIOUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, statioUserTablesQuery) diff --git a/collector/pg_statio_user_tables_test.go b/collector/pg_statio_user_tables_test.go index 0a7174d80..d57cab9f5 100644 --- a/collector/pg_statio_user_tables_test.go +++ b/collector/pg_statio_user_tables_test.go @@ -29,6 +29,8 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{ "datname", "schemaname", @@ -60,7 +62,7 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { defer close(ch) c := PGStatIOUserTablesCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err) } }() diff --git a/collector/probe.go b/collector/probe.go index 9044c40f9..834c65177 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "sync" "github.com/go-kit/log" @@ -27,7 +26,7 @@ type ProbeCollector struct { registry *prometheus.Registry collectors map[string]Collector logger log.Logger - db *sql.DB + instance *instance } func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { @@ -58,18 +57,16 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p } } - db, err := sql.Open("postgres", dsn.GetConnectionString()) + instance, err := newInstance(dsn.GetConnectionString()) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) return &ProbeCollector{ registry: registry, collectors: collectors, logger: logger, - db: db, + instance: instance, }, nil } @@ -81,7 +78,7 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(pc.collectors)) for name, c := range pc.collectors { go func(name string, c Collector) { - execute(context.TODO(), name, c, pc.db, ch, pc.logger) + execute(context.TODO(), name, c, pc.instance, ch, pc.logger) wg.Done() }(name, c) } @@ -89,5 +86,5 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { } func (pc *ProbeCollector) Close() error { - return pc.db.Close() + return pc.instance.Close() }