Skip to content
This repository has been archived by the owner on Aug 22, 2024. It is now read-only.

Commit

Permalink
Revert "Use updated common collector logic in postgres/conflicts coll…
Browse files Browse the repository at this point in the history
…ector."

This reverts commit be53e03
  • Loading branch information
lesovsky committed May 15, 2021
1 parent 2d4a82e commit 134adf5
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 23 deletions.
155 changes: 133 additions & 22 deletions internal/collector/postgres_conflicts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,87 @@ package collector

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/weaponry/pgscv/internal/log"
"github.com/weaponry/pgscv/internal/model"
"github.com/weaponry/pgscv/internal/store"
"strconv"
)

const (
postgresDatabaseConflictsQuery = "SELECT datname," +
"confl_tablespace AS tablespace," +
"confl_lock AS lock," +
"confl_snapshot AS snapshot," +
"confl_bufferpin AS bufferpin," +
"confl_deadlock AS deadlock " +
"nullif(confl_tablespace, 0) AS tablespace," +
"nullif(confl_lock, 0) AS lock," +
"nullif(confl_snapshot, 0) AS snapshot," +
"nullif(confl_bufferpin, 0) AS bufferpin," +
"nullif(confl_deadlock, 0) AS deadlock " +
"FROM pg_stat_database_conflicts"
)

type postgresConflictsCollector struct {
//labelNames []string
//conflicts typedDesc
builtin []typedDescSet
custom []typedDescSet
labelNames []string
conflicts typedDesc
custom []typedDescSet
}

// NewPostgresConflictsCollector returns a new Collector exposing postgres databases recovery conflicts stats.
// For details see https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-DATABASE-CONFLICTS-VIEW
func NewPostgresConflictsCollector(constLabels prometheus.Labels, settings model.CollectorSettings) (Collector, error) {
// This instance of builtinSubsystems just used for detecting collisions with user-defined metrics.
// This must be always synchronized with metric descriptors in returned value.
builtinSubsystems := model.Subsystems{
"recovery": {
Query: postgresDatabaseConflictsQuery,
Metrics: model.Metrics{
{
ShortName: "conflicts_total",
Usage: "COUNTER",
LabeledValues: map[string][]string{"conflict": {"tablespace", "lock", "snapshot", "bufferpin", "deadlock"}},
Labels: []string{"datname"},
Description: "Total number of recovery conflicts occurred by each conflict type.",
},
{ShortName: "datname"},
{ShortName: "tablespace"},
{ShortName: "lock"},
{ShortName: "snapshot"},
{ShortName: "bufferpin"},
{ShortName: "deadlock"},
},
},
}

removeCollisions(builtinSubsystems, settings.Subsystems)

labelNames := []string{"datname", "conflict"}

return &postgresConflictsCollector{
builtin: newDeskSetsFromSubsystems("postgres", builtinSubsystems, constLabels),
custom: newDeskSetsFromSubsystems("postgres", settings.Subsystems, constLabels),
labelNames: labelNames,
conflicts: typedDesc{
desc: prometheus.NewDesc(
prometheus.BuildFQName("postgres", "recovery", "conflicts_total"),
"Total number of recovery conflicts occurred by each conflict type.",
labelNames, constLabels,
), valueType: prometheus.CounterValue,
},
custom: newDeskSetsFromSubsystems("postgres", settings.Subsystems, constLabels),
}, nil
}

// Update method collects statistics, parse it and produces metrics that are sent to Prometheus.
func (c *postgresConflictsCollector) Update(config Config, ch chan<- prometheus.Metric) error {
// TODO: update only when Postgres is in recovery.
conn, err := store.New(config.ConnString)
if err != nil {
return err
}

// Update builtin metrics.
err := updateAllDescSets(config, c.builtin, ch)
res, err := conn.Query(postgresDatabaseConflictsQuery)
if err != nil {
return err
}

conn.Close()

stats := parsePostgresConflictStats(res, c.labelNames)

for _, stat := range stats {
ch <- c.conflicts.mustNewConstMetric(stat.tablespace, stat.datname, "tablespace")
ch <- c.conflicts.mustNewConstMetric(stat.lock, stat.datname, "lock")
ch <- c.conflicts.mustNewConstMetric(stat.snapshot, stat.datname, "snapshot")
ch <- c.conflicts.mustNewConstMetric(stat.bufferpin, stat.datname, "bufferpin")
ch <- c.conflicts.mustNewConstMetric(stat.deadlock, stat.datname, "deadlock")
}

// Update user-defined metrics.
err = updateAllDescSets(config, c.custom, ch)
if err != nil {
Expand All @@ -66,3 +91,89 @@ func (c *postgresConflictsCollector) Update(config Config, ch chan<- prometheus.

return nil
}

// postgresConflictStat represents per-database recovery conflicts stats based on pg_stat_database_conflicts.
type postgresConflictStat struct {
datname string
tablespace float64
lock float64
snapshot float64
bufferpin float64
deadlock float64
}

// parsePostgresDatabasesStats parses PGResult, extract data and return struct with stats values.
func parsePostgresConflictStats(r *model.PGResult, labelNames []string) map[string]postgresConflictStat {
log.Debug("parse postgres database conflicts stats")

var stats = make(map[string]postgresConflictStat)

// process row by row
for _, row := range r.Rows {
stat := postgresConflictStat{}

// collect label values
for i, colname := range r.Colnames {
switch string(colname.Name) {
case "datname":
stat.datname = row[i].String
}
}

// Define a map key as a database name.
databaseFQName := stat.datname

// Put stats with labels (but with no data values yet) into stats store.
stats[databaseFQName] = stat

// fetch data values from columns
for i, colname := range r.Colnames {
// skip columns if its value used as a label
if stringsContains(labelNames, string(colname.Name)) {
log.Debugf("skip label mapped column '%s'", string(colname.Name))
continue
}

// Skip empty (NULL) values.
if !row[i].Valid {
continue
}

// Get data value and convert it to float64 used by Prometheus.
v, err := strconv.ParseFloat(row[i].String, 64)
if err != nil {
log.Errorf("invalid input, parse '%s' failed: %s; skip", row[i].String, err)
continue
}

// Run column-specific logic
switch string(colname.Name) {
case "confl_tablespace":
s := stats[databaseFQName]
s.tablespace = v
stats[databaseFQName] = s
case "confl_lock":
s := stats[databaseFQName]
s.lock = v
stats[databaseFQName] = s
case "confl_snapshot":
s := stats[databaseFQName]
s.snapshot = v
stats[databaseFQName] = s
case "confl_bufferpin":
s := stats[databaseFQName]
s.bufferpin = v
stats[databaseFQName] = s
case "confl_deadlock":
s := stats[databaseFQName]
s.deadlock = v
stats[databaseFQName] = s
default:
log.Debugf("unsupported pg_stat_database_conflicts stat column: %s, skip", string(colname.Name))
continue
}
}
}

return stats
}
45 changes: 44 additions & 1 deletion internal/collector/postgres_conflicts_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package collector

import (
"database/sql"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/assert"
"github.com/weaponry/pgscv/internal/model"
"testing"
)

func TestPostgresConflictsCollector_Update(t *testing.T) {
var input = pipelineInput{
required: []string{
optional: []string{
"postgres_recovery_conflicts_total",
},
collector: NewPostgresConflictsCollector,
Expand All @@ -16,3 +19,43 @@ func TestPostgresConflictsCollector_Update(t *testing.T) {

pipeline(t, input)
}

func Test_parsePostgresConflictsStats(t *testing.T) {
var testCases = []struct {
name string
res *model.PGResult
want map[string]postgresConflictStat
}{
{
name: "normal output",
res: &model.PGResult{
Nrows: 2,
Ncols: 6,
Colnames: []pgproto3.FieldDescription{
{Name: []byte("datname")}, {Name: []byte("confl_tablespace")}, {Name: []byte("confl_lock")},
{Name: []byte("confl_snapshot")}, {Name: []byte("confl_bufferpin")}, {Name: []byte("confl_deadlock")},
},
Rows: [][]sql.NullString{
{
{String: "testdb1", Valid: true}, {String: "123", Valid: true}, {String: "548", Valid: true},
{String: "784", Valid: true}, {String: "896", Valid: true}, {String: "896", Valid: true},
},
{
{String: "testdb2", Valid: true}, {}, {}, {}, {}, {},
},
},
},
want: map[string]postgresConflictStat{
"testdb1": {datname: "testdb1", tablespace: 123, lock: 548, snapshot: 784, bufferpin: 896, deadlock: 896},
"testdb2": {datname: "testdb2"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := parsePostgresConflictStats(tc.res, []string{"datname", "reason"})
assert.EqualValues(t, tc.want, got)
})
}
}

0 comments on commit 134adf5

Please sign in to comment.