From 8bd175a87d73ec169a5b76fde274d0747170c4a7 Mon Sep 17 00:00:00 2001 From: Alex Simenduev Date: Thu, 16 Nov 2023 10:40:03 +0200 Subject: [PATCH] pg_replication_slot: add slot type label Signed-off-by: Alex Simenduev --- collector/pg_replication_slot.go | 28 +++++++++++++-------- collector/pg_replication_slot_test.go | 36 +++++++++++++-------------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index c625fd4d5..7f1ba003e 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -43,7 +43,7 @@ var ( "slot_current_wal_lsn", ), "current wal lsn value", - []string{"slot_name"}, nil, + []string{"slot_name", "slot_type"}, nil, ) pgReplicationSlotCurrentFlushDesc = prometheus.NewDesc( prometheus.BuildFQName( @@ -52,7 +52,7 @@ var ( "slot_confirmed_flush_lsn", ), "last lsn confirmed flushed to the replication slot", - []string{"slot_name"}, nil, + []string{"slot_name", "slot_type"}, nil, ) pgReplicationSlotIsActiveDesc = prometheus.NewDesc( prometheus.BuildFQName( @@ -61,17 +61,18 @@ var ( "slot_is_active", ), "whether the replication slot is active or not", - []string{"slot_name"}, nil, + []string{"slot_name", "slot_type"}, nil, ) pgReplicationSlotQuery = `SELECT slot_name, - CASE WHEN pg_is_in_recovery() THEN + slot_type, + CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() - '0/0' - ELSE - pg_current_wal_lsn() - '0/0' + ELSE + pg_current_wal_lsn() - '0/0' END AS current_wal_lsn, - COALESCE(confirmed_flush_lsn, '0/0') - '0/0', + COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, active FROM pg_replication_slots;` ) @@ -87,10 +88,11 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance for rows.Next() { var slotName sql.NullString + var slotType sql.NullString var walLSN sql.NullFloat64 var flushLSN sql.NullFloat64 var isActive sql.NullBool - if err := rows.Scan(&slotName, &walLSN, &flushLSN, &isActive); err != nil { + if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive); err != nil { return err } @@ -102,6 +104,10 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance if slotName.Valid { slotNameLabel = slotName.String } + slotTypeLabel := "unknown" + if slotType.Valid { + slotTypeLabel = slotType.String + } var walLSNMetric float64 if walLSN.Valid { @@ -109,7 +115,7 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance } ch <- prometheus.MustNewConstMetric( pgReplicationSlotCurrentWalDesc, - prometheus.GaugeValue, walLSNMetric, slotNameLabel, + prometheus.GaugeValue, walLSNMetric, slotNameLabel, slotTypeLabel, ) if isActive.Valid && isActive.Bool { var flushLSNMetric float64 @@ -118,12 +124,12 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance } ch <- prometheus.MustNewConstMetric( pgReplicationSlotCurrentFlushDesc, - prometheus.GaugeValue, flushLSNMetric, slotNameLabel, + prometheus.GaugeValue, flushLSNMetric, slotNameLabel, slotTypeLabel, ) } ch <- prometheus.MustNewConstMetric( pgReplicationSlotIsActiveDesc, - prometheus.GaugeValue, isActiveValue, slotNameLabel, + prometheus.GaugeValue, isActiveValue, slotNameLabel, slotTypeLabel, ) } return rows.Err() diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 7e91ea261..212050c46 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", 5, 3, true) + AddRow("test_slot", "physical", 5, 3, true) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -47,9 +47,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { }() expected := []MetricResult{ - {labels: labelMap{"slot_name": "test_slot"}, value: 5, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"slot_name": "test_slot"}, value: 3, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"slot_name": "test_slot"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -72,9 +72,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", 6, 12, false) + AddRow("test_slot", "physical", 6, 12, false) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -88,8 +88,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { }() expected := []MetricResult{ - {labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -113,9 +113,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", 6, 12, nil) + AddRow("test_slot", "physical", 6, 12, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -129,8 +129,8 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { }() expected := []MetricResult{ - {labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -153,9 +153,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). - AddRow(nil, nil, nil, true) + AddRow(nil, nil, nil, nil, true) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -169,9 +169,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { }() expected := []MetricResult{ - {labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"slot_name": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "unknown", "slot_type": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "unknown", "slot_type": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "unknown", "slot_type": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() {