Skip to content
This repository has been archived by the owner on Aug 22, 2024. It is now read-only.

Commit

Permalink
Move WAL metrics to separate collector.
Browse files Browse the repository at this point in the history
  • Loading branch information
lesovsky committed Sep 1, 2021
1 parent 7c0bc47 commit cae5ba2
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 62 deletions.
1 change: 1 addition & 0 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (f Factories) RegisterPostgresCollectors(disabled []string) {
"postgres/settings": NewPostgresSettingsCollector,
"postgres/storage": NewPostgresStorageCollector,
"postgres/tables": NewPostgresTablesCollector,
"postgres/wal": NewPostgresWalCollector,
"postgres/custom": NewPostgresCustomCollector,
}

Expand Down
42 changes: 0 additions & 42 deletions internal/collector/postgres_replication.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package collector

import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaponry/pgscv/internal/log"
"github.com/weaponry/pgscv/internal/model"
Expand All @@ -10,12 +9,6 @@ import (
)

const (
postgresWalQuery96 = "SELECT pg_is_in_recovery()::int AS recovery, " +
"(case pg_is_in_recovery() when 't' then pg_last_xlog_receive_location() else pg_current_xlog_location() end) - '0/00000000' AS wal_bytes"

postgresWalQuertLatest = "SELECT pg_is_in_recovery()::int AS recovery, " +
"(case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) - '0/00000000' AS wal_bytes"

// Query for Postgres version 9.6 and older.
postgresReplicationQuery96 = "SELECT pid, coalesce(client_addr, '127.0.0.1') AS client_addr, usename AS user, application_name, state, " +
"pg_current_xlog_location() - sent_location AS pending_lag_bytes, " +
Expand All @@ -42,8 +35,6 @@ const (

type postgresReplicationCollector struct {
labelNames []string
recovery typedDesc
wal typedDesc
lagbytes typedDesc
lagseconds typedDesc
lagtotalbytes typedDesc
Expand All @@ -57,18 +48,6 @@ func NewPostgresReplicationCollector(constLabels labels, settings model.Collecto

return &postgresReplicationCollector{
labelNames: labelNames,
recovery: newBuiltinTypedDesc(
descOpts{"postgres", "recovery", "info", "Current recovery state, 0 - not in recovery; 1 - in recovery.", 0},
prometheus.GaugeValue,
nil, constLabels,
settings.Filters,
),
wal: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "written_bytes_total", "Total amount of WAL written (or received in case of standby), in bytes.", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
lagbytes: newBuiltinTypedDesc(
descOpts{"postgres", "replication", "lag_bytes", "Number of bytes standby is behind than primary in each WAL processing phase.", 0},
prometheus.GaugeValue,
Expand Down Expand Up @@ -104,17 +83,6 @@ func (c *postgresReplicationCollector) Update(config Config, ch chan<- prometheu
}
defer conn.Close()

// Get recovery state.
var recovery int
var walBytes int64
err = conn.Conn().QueryRow(context.TODO(), selectWalQuery(config.serverVersionNum)).Scan(&recovery, &walBytes)
if err != nil {
log.Warnf("get recovery state failed: %s; skip", err)
} else {
ch <- c.recovery.newConstMetric(float64(recovery))
ch <- c.wal.newConstMetric(float64(walBytes))
}

// Get replication stats.
res, err := conn.Query(selectReplicationQuery(config.serverVersionNum))
if err != nil {
Expand Down Expand Up @@ -259,13 +227,3 @@ func selectReplicationQuery(version int) string {
return postgresReplicationQueryLatest
}
}

// selectWalQuery returns suitable wal state query depending on passed version.
func selectWalQuery(version int) string {
switch {
case version < PostgresV10:
return postgresWalQuery96
default:
return postgresWalQuertLatest
}
}
20 changes: 0 additions & 20 deletions internal/collector/postgres_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
func TestPostgresReplicationCollector_Update(t *testing.T) {
var input = pipelineInput{
required: []string{
"postgres_recovery_info",
"postgres_wal_written_bytes_total",
"postgres_replication_lag_bytes",
"postgres_replication_lag_all_bytes",
"postgres_replication_lag_seconds",
Expand Down Expand Up @@ -103,21 +101,3 @@ func Test_selectReplicationQuery(t *testing.T) {
})
}
}

func Test_selectWalQuery(t *testing.T) {
var testcases = []struct {
version int
want string
}{
{version: 90600, want: postgresWalQuery96},
{version: 90605, want: postgresWalQuery96},
{version: 100000, want: postgresWalQuertLatest},
{version: 100005, want: postgresWalQuertLatest},
}

for _, tc := range testcases {
t.Run("", func(t *testing.T) {
assert.Equal(t, tc.want, selectWalQuery(tc.version))
})
}
}
77 changes: 77 additions & 0 deletions internal/collector/postgres_wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package collector

import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaponry/pgscv/internal/log"
"github.com/weaponry/pgscv/internal/model"
"github.com/weaponry/pgscv/internal/store"
)

const (
postgresWalQuery96 = "SELECT pg_is_in_recovery()::int AS recovery, " +
"(case pg_is_in_recovery() when 't' then pg_last_xlog_receive_location() else pg_current_xlog_location() end) - '0/00000000' AS wal_bytes"

postgresWalQuertLatest = "SELECT pg_is_in_recovery()::int AS recovery, " +
"(case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) - '0/00000000' AS wal_bytes"
)

type postgresWalCollector struct {
labelNames []string
recovery typedDesc
wal typedDesc
}

// NewPostgresWalCollector returns a new Collector exposing postgres WAL stats.
// For details see https://www.postgresql.org/docs/current/monitoring-stats.html#PG-STAT-WAL-VIEW
func NewPostgresWalCollector(constLabels labels, settings model.CollectorSettings) (Collector, error) {
var labelNames = []string{"client_addr", "user", "application_name", "state", "lag"}

return &postgresWalCollector{
labelNames: labelNames,
recovery: newBuiltinTypedDesc(
descOpts{"postgres", "recovery", "info", "Current recovery state, 0 - not in recovery; 1 - in recovery.", 0},
prometheus.GaugeValue,
nil, constLabels,
settings.Filters,
),
wal: newBuiltinTypedDesc(
descOpts{"postgres", "wal", "written_bytes_total", "Total amount of WAL written (or received in case of standby), in bytes.", 0},
prometheus.CounterValue,
nil, constLabels,
settings.Filters,
),
}, nil
}

// Update method collects statistics, parse it and produces metrics that are sent to Prometheus.
func (c *postgresWalCollector) Update(config Config, ch chan<- prometheus.Metric) error {
conn, err := store.New(config.ConnString)
if err != nil {
return err
}
defer conn.Close()

// Get recovery state.
var recovery int
var walBytes int64
err = conn.Conn().QueryRow(context.TODO(), selectWalQuery(config.serverVersionNum)).Scan(&recovery, &walBytes)
if err != nil {
log.Warnf("get recovery state failed: %s; skip", err)
} else {
ch <- c.recovery.newConstMetric(float64(recovery))
ch <- c.wal.newConstMetric(float64(walBytes))
}

return nil
}

// selectWalQuery returns suitable wal state query depending on passed version.
func selectWalQuery(version int) string {
switch {
case version < PostgresV10:
return postgresWalQuery96
default:
return postgresWalQuertLatest
}
}
39 changes: 39 additions & 0 deletions internal/collector/postgres_wal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package collector

import (
"github.com/stretchr/testify/assert"
"github.com/weaponry/pgscv/internal/model"
"testing"
)

func TestPostgresWalCollector_Update(t *testing.T) {
var input = pipelineInput{
required: []string{
"postgres_recovery_info",
"postgres_wal_written_bytes_total",
},
optional: []string{},
collector: NewPostgresWalCollector,
service: model.ServiceTypePostgresql,
}

pipeline(t, input)
}

func Test_selectWalQuery(t *testing.T) {
var testcases = []struct {
version int
want string
}{
{version: 90600, want: postgresWalQuery96},
{version: 90605, want: postgresWalQuery96},
{version: 100000, want: postgresWalQuertLatest},
{version: 100005, want: postgresWalQuertLatest},
}

for _, tc := range testcases {
t.Run("", func(t *testing.T) {
assert.Equal(t, tc.want, selectWalQuery(tc.version))
})
}
}

0 comments on commit cae5ba2

Please sign in to comment.