From 134adf599a368ead8641aa94ab2e06d1d02de312 Mon Sep 17 00:00:00 2001 From: Alexey Lesovsky Date: Sat, 15 May 2021 20:57:55 +0500 Subject: [PATCH] Revert "Use updated common collector logic in postgres/conflicts collector." This reverts commit be53e038 --- internal/collector/postgres_conflicts.go | 155 +++++++++++++++--- internal/collector/postgres_conflicts_test.go | 45 ++++- 2 files changed, 177 insertions(+), 23 deletions(-) diff --git a/internal/collector/postgres_conflicts.go b/internal/collector/postgres_conflicts.go index 097b7d8a..0db29298 100644 --- a/internal/collector/postgres_conflicts.go +++ b/internal/collector/postgres_conflicts.go @@ -2,62 +2,87 @@ package collector import ( "github.com/prometheus/client_golang/prometheus" + "github.com/weaponry/pgscv/internal/log" "github.com/weaponry/pgscv/internal/model" + "github.com/weaponry/pgscv/internal/store" + "strconv" ) const ( postgresDatabaseConflictsQuery = "SELECT datname," + - "confl_tablespace AS tablespace," + - "confl_lock AS lock," + - "confl_snapshot AS snapshot," + - "confl_bufferpin AS bufferpin," + - "confl_deadlock AS deadlock " + + "nullif(confl_tablespace, 0) AS tablespace," + + "nullif(confl_lock, 0) AS lock," + + "nullif(confl_snapshot, 0) AS snapshot," + + "nullif(confl_bufferpin, 0) AS bufferpin," + + "nullif(confl_deadlock, 0) AS deadlock " + "FROM pg_stat_database_conflicts" ) type postgresConflictsCollector struct { - //labelNames []string - //conflicts typedDesc - builtin []typedDescSet - custom []typedDescSet + labelNames []string + conflicts typedDesc + custom []typedDescSet } // NewPostgresConflictsCollector returns a new Collector exposing postgres databases recovery conflicts stats. // For details see https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-DATABASE-CONFLICTS-VIEW func NewPostgresConflictsCollector(constLabels prometheus.Labels, settings model.CollectorSettings) (Collector, error) { + // This instance of builtinSubsystems just used for detecting collisions with user-defined metrics. + // This must be always synchronized with metric descriptors in returned value. builtinSubsystems := model.Subsystems{ "recovery": { - Query: postgresDatabaseConflictsQuery, Metrics: model.Metrics{ - { - ShortName: "conflicts_total", - Usage: "COUNTER", - LabeledValues: map[string][]string{"conflict": {"tablespace", "lock", "snapshot", "bufferpin", "deadlock"}}, - Labels: []string{"datname"}, - Description: "Total number of recovery conflicts occurred by each conflict type.", - }, + {ShortName: "datname"}, + {ShortName: "tablespace"}, + {ShortName: "lock"}, + {ShortName: "snapshot"}, + {ShortName: "bufferpin"}, + {ShortName: "deadlock"}, }, }, } removeCollisions(builtinSubsystems, settings.Subsystems) + labelNames := []string{"datname", "conflict"} + return &postgresConflictsCollector{ - builtin: newDeskSetsFromSubsystems("postgres", builtinSubsystems, constLabels), - custom: newDeskSetsFromSubsystems("postgres", settings.Subsystems, constLabels), + labelNames: labelNames, + conflicts: typedDesc{ + desc: prometheus.NewDesc( + prometheus.BuildFQName("postgres", "recovery", "conflicts_total"), + "Total number of recovery conflicts occurred by each conflict type.", + labelNames, constLabels, + ), valueType: prometheus.CounterValue, + }, + custom: newDeskSetsFromSubsystems("postgres", settings.Subsystems, constLabels), }, nil } // Update method collects statistics, parse it and produces metrics that are sent to Prometheus. func (c *postgresConflictsCollector) Update(config Config, ch chan<- prometheus.Metric) error { - // TODO: update only when Postgres is in recovery. + conn, err := store.New(config.ConnString) + if err != nil { + return err + } - // Update builtin metrics. - err := updateAllDescSets(config, c.builtin, ch) + res, err := conn.Query(postgresDatabaseConflictsQuery) if err != nil { return err } + conn.Close() + + stats := parsePostgresConflictStats(res, c.labelNames) + + for _, stat := range stats { + ch <- c.conflicts.mustNewConstMetric(stat.tablespace, stat.datname, "tablespace") + ch <- c.conflicts.mustNewConstMetric(stat.lock, stat.datname, "lock") + ch <- c.conflicts.mustNewConstMetric(stat.snapshot, stat.datname, "snapshot") + ch <- c.conflicts.mustNewConstMetric(stat.bufferpin, stat.datname, "bufferpin") + ch <- c.conflicts.mustNewConstMetric(stat.deadlock, stat.datname, "deadlock") + } + // Update user-defined metrics. err = updateAllDescSets(config, c.custom, ch) if err != nil { @@ -66,3 +91,89 @@ func (c *postgresConflictsCollector) Update(config Config, ch chan<- prometheus. return nil } + +// postgresConflictStat represents per-database recovery conflicts stats based on pg_stat_database_conflicts. +type postgresConflictStat struct { + datname string + tablespace float64 + lock float64 + snapshot float64 + bufferpin float64 + deadlock float64 +} + +// parsePostgresDatabasesStats parses PGResult, extract data and return struct with stats values. +func parsePostgresConflictStats(r *model.PGResult, labelNames []string) map[string]postgresConflictStat { + log.Debug("parse postgres database conflicts stats") + + var stats = make(map[string]postgresConflictStat) + + // process row by row + for _, row := range r.Rows { + stat := postgresConflictStat{} + + // collect label values + for i, colname := range r.Colnames { + switch string(colname.Name) { + case "datname": + stat.datname = row[i].String + } + } + + // Define a map key as a database name. + databaseFQName := stat.datname + + // Put stats with labels (but with no data values yet) into stats store. + stats[databaseFQName] = stat + + // fetch data values from columns + for i, colname := range r.Colnames { + // skip columns if its value used as a label + if stringsContains(labelNames, string(colname.Name)) { + log.Debugf("skip label mapped column '%s'", string(colname.Name)) + continue + } + + // Skip empty (NULL) values. + if !row[i].Valid { + continue + } + + // Get data value and convert it to float64 used by Prometheus. + v, err := strconv.ParseFloat(row[i].String, 64) + if err != nil { + log.Errorf("invalid input, parse '%s' failed: %s; skip", row[i].String, err) + continue + } + + // Run column-specific logic + switch string(colname.Name) { + case "confl_tablespace": + s := stats[databaseFQName] + s.tablespace = v + stats[databaseFQName] = s + case "confl_lock": + s := stats[databaseFQName] + s.lock = v + stats[databaseFQName] = s + case "confl_snapshot": + s := stats[databaseFQName] + s.snapshot = v + stats[databaseFQName] = s + case "confl_bufferpin": + s := stats[databaseFQName] + s.bufferpin = v + stats[databaseFQName] = s + case "confl_deadlock": + s := stats[databaseFQName] + s.deadlock = v + stats[databaseFQName] = s + default: + log.Debugf("unsupported pg_stat_database_conflicts stat column: %s, skip", string(colname.Name)) + continue + } + } + } + + return stats +} diff --git a/internal/collector/postgres_conflicts_test.go b/internal/collector/postgres_conflicts_test.go index d91290bd..5cf0c54f 100644 --- a/internal/collector/postgres_conflicts_test.go +++ b/internal/collector/postgres_conflicts_test.go @@ -1,13 +1,16 @@ package collector import ( + "database/sql" + "github.com/jackc/pgproto3/v2" + "github.com/stretchr/testify/assert" "github.com/weaponry/pgscv/internal/model" "testing" ) func TestPostgresConflictsCollector_Update(t *testing.T) { var input = pipelineInput{ - required: []string{ + optional: []string{ "postgres_recovery_conflicts_total", }, collector: NewPostgresConflictsCollector, @@ -16,3 +19,43 @@ func TestPostgresConflictsCollector_Update(t *testing.T) { pipeline(t, input) } + +func Test_parsePostgresConflictsStats(t *testing.T) { + var testCases = []struct { + name string + res *model.PGResult + want map[string]postgresConflictStat + }{ + { + name: "normal output", + res: &model.PGResult{ + Nrows: 2, + Ncols: 6, + Colnames: []pgproto3.FieldDescription{ + {Name: []byte("datname")}, {Name: []byte("confl_tablespace")}, {Name: []byte("confl_lock")}, + {Name: []byte("confl_snapshot")}, {Name: []byte("confl_bufferpin")}, {Name: []byte("confl_deadlock")}, + }, + Rows: [][]sql.NullString{ + { + {String: "testdb1", Valid: true}, {String: "123", Valid: true}, {String: "548", Valid: true}, + {String: "784", Valid: true}, {String: "896", Valid: true}, {String: "896", Valid: true}, + }, + { + {String: "testdb2", Valid: true}, {}, {}, {}, {}, {}, + }, + }, + }, + want: map[string]postgresConflictStat{ + "testdb1": {datname: "testdb1", tablespace: 123, lock: 548, snapshot: 784, bufferpin: 896, deadlock: 896}, + "testdb2": {datname: "testdb2"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := parsePostgresConflictStats(tc.res, []string{"datname", "reason"}) + assert.EqualValues(t, tc.want, got) + }) + } +}