From 4e4a7295a52e9c3e2b7b2dc9e34fba926c0f8581 Mon Sep 17 00:00:00 2001 From: MarcWort <113890636+MarcWort@users.noreply.github.com> Date: Tue, 7 May 2024 11:00:21 +0200 Subject: [PATCH] feat: Add wal_status to replication_slot Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com> --- collector/pg_replication_slot.go | 30 +++++++++++++++++++++++++-- collector/pg_replication_slot_test.go | 19 ++++++++++------- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 9f250ef96..a3c5c4ac7 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -72,6 +72,15 @@ var ( "number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost", []string{"slot_name", "slot_type"}, nil, ) + pgReplicationSlotWalStatus = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + replicationSlotSubsystem, + "wal_status", + ), + "availability of WAL files claimed by this slot", + []string{"slot_name", "slot_type"}, nil, + ) pgReplicationSlotQuery = `SELECT slot_name, @@ -83,7 +92,8 @@ var ( END AS current_wal_lsn, COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, active, - safe_wal_size + safe_wal_size, + wal_status FROM pg_replication_slots;` ) @@ -103,7 +113,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance var flushLSN sql.NullFloat64 var isActive sql.NullBool var safeWalSize sql.NullInt64 - if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil { + var walStatus sql.NullString + if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil { return err } @@ -149,6 +160,21 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel, ) } + + if walStatus.Valid { + // See https://www.postgresql.org/docs/14/view-pg-replication-slots.html + walStatusMap := map[string]int{ + "reserved": 0, // reserved means that the claimed files are within max_wal_size. + "extended": 1, // extended means that max_wal_size is exceeded but the files are still retained, either by the replication slot or by wal_keep_size. + "unreserved": 2, // unreserved means that the slot no longer retains the required WAL files and some of them are to be removed at the next checkpoint. This state can return to reserved or extended. + "lost": 3, // lost means that some required WAL files have been removed and this slot is no longer usable. + } + + ch <- prometheus.MustNewConstMetric( + pgReplicationSlotWalStatus, + prometheus.GaugeValue, float64(walStatusMap[walStatus.String]), slotNameLabel, slotTypeLabel, + ) + } } return rows.Err() } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index ec0bee135..4d137bf96 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 5, 3, true, 323906992) + AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -51,6 +51,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -73,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, false, -4000) + AddRow("test_slot", "physical", 6, 12, false, -4000, "extended") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -92,6 +93,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -115,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, nil, nil) + AddRow("test_slot", "physical", 6, 12, nil, nil, "lost") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -133,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { expected := []MetricResult{ {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -155,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow(nil, nil, nil, nil, true, nil) + AddRow(nil, nil, nil, nil, true, nil, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric)