Skip to content

Commit

Permalink
fix: handle pg_replication_slots on pg<13 (#1098)
Browse files Browse the repository at this point in the history
* fix: handle pg_replication_slots on pg<13

Signed-off-by: Michael Todorovic <[email protected]>

* fix: tests

Signed-off-by: Michael Todorovic <[email protected]>

---------

Signed-off-by: Michael Todorovic <[email protected]>
  • Loading branch information
michael-todorovic authored Feb 15, 2025
1 parent 072864d commit 9e42fc0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
38 changes: 35 additions & 3 deletions collector/pg_replication_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"log/slog"

"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -81,8 +82,18 @@ var (
"availability of WAL files claimed by this slot",
[]string{"slot_name", "slot_type", "wal_status"}, nil,
)

pgReplicationSlotQuery = `SELECT
slot_name,
slot_type,
CASE WHEN pg_is_in_recovery() THEN
pg_last_wal_receive_lsn() - '0/0'
ELSE
pg_current_wal_lsn() - '0/0'
END AS current_wal_lsn,
COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn,
active
FROM pg_replication_slots;`
pgReplicationSlotNewQuery = `SELECT
slot_name,
slot_type,
CASE WHEN pg_is_in_recovery() THEN
Expand All @@ -98,9 +109,15 @@ var (
)

func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
query := pgReplicationSlotQuery
abovePG13 := instance.version.GTE(semver.MustParse("13.0.0"))
if abovePG13 {
query = pgReplicationSlotNewQuery
}

db := instance.getDB()
rows, err := db.QueryContext(ctx,
pgReplicationSlotQuery)
query)
if err != nil {
return err
}
Expand All @@ -114,7 +131,22 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
var isActive sql.NullBool
var safeWalSize sql.NullInt64
var walStatus sql.NullString
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil {

r := []any{
&slotName,
&slotType,
&walLSN,
&flushLSN,
&isActive,
}

if abovePG13 {
r = append(r, &safeWalSize)
r = append(r, &walStatus)
}

err := rows.Scan(r...)
if err != nil {
return err
}

Expand Down
17 changes: 9 additions & 8 deletions collector/pg_replication_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
Expand All @@ -29,12 +30,12 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved")
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -72,12 +73,12 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 6, 12, false, -4000, "extended")
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -115,12 +116,12 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 6, 12, nil, nil, "lost")
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -156,12 +157,12 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, true, nil, nil)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotNewQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down

0 comments on commit 9e42fc0

Please sign in to comment.