From 05fee94661058453c101351112a33f3fa3f52361 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Mon, 27 May 2024 09:04:21 +0200 Subject: [PATCH 1/2] Rework receivers status gathering to not need upstream changes. The Grafana version of the Alertmanager /api/v1/receivers API is extended to return information about the last notification attempt for each integration. This has been implemented as a change to the Prometheus Alertmanager fork. We would like to avoid divergence from upstream where possible. This change reworks the status gathering for receivers such that we do not need upstream changes. It instead works by wrapping the notify.Notifier interface for each integration, and capturing the necessary details. To make this as clean as possible, there are two drop-in replacements for Receiver and Integration, which wrap the upstream types plus the additional functionality. This means the code that uses these types can stay largely untouched. --- notify/grafana_alertmanager.go | 17 +++-- notify/grafana_alertmanager_test.go | 19 ++--- notify/nfstatus/integration.go | 112 ++++++++++++++++++++++++++++ notify/nfstatus/integration_test.go | 78 +++++++++++++++++++ notify/nfstatus/receiver.go | 36 +++++++++ 5 files changed, 245 insertions(+), 17 deletions(-) create mode 100644 notify/nfstatus/integration.go create mode 100644 notify/nfstatus/integration_test.go create mode 100644 notify/nfstatus/receiver.go diff --git a/notify/grafana_alertmanager.go b/notify/grafana_alertmanager.go index de2151a1..0b22bc1d 100644 --- a/notify/grafana_alertmanager.go +++ b/notify/grafana_alertmanager.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/alerting/cluster" + "github.com/grafana/alerting/notify/nfstatus" amv2 "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" @@ -95,7 +96,7 @@ type GrafanaAlertmanager struct { reloadConfigMtx sync.RWMutex configHash [16]byte config []byte - receivers []*notify.Receiver + receivers []*nfstatus.Receiver // buildReceiverIntegrationsFunc builds the integrations for a receiver based on its APIReceiver configuration and the current parsed template. buildReceiverIntegrationsFunc func(next *APIReceiver, tmpl *templates.Template) ([]*Integration, error) @@ -124,18 +125,18 @@ type MaintenanceOptions interface { MaintenanceFunc(state State) (int64, error) } -var NewIntegration = notify.NewIntegration +var NewIntegration = nfstatus.NewIntegration type InhibitRule = config.InhibitRule type MuteTimeInterval = config.MuteTimeInterval type TimeInterval = config.TimeInterval type Route = config.Route -type Integration = notify.Integration +type Integration = nfstatus.Integration type DispatcherLimits = dispatch.Limits type Notifier = notify.Notifier //nolint:revive -type NotifyReceiver = notify.Receiver +type NotifyReceiver = nfstatus.Receiver // Configuration is an interface for accessing Alertmanager configuration. type Configuration interface { @@ -402,14 +403,14 @@ func (am *GrafanaAlertmanager) ApplyConfig(cfg Configuration) (err error) { am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, am.timeoutFunc, cfg.DispatcherLimits(), am.logger, am.dispatcherMetrics) // TODO: This has not been upstreamed yet. Should be aligned when https://github.com/prometheus/alertmanager/pull/3016 is merged. - var receivers []*notify.Receiver + var receivers []*nfstatus.Receiver activeReceivers := GetActiveReceiversMap(am.route) for name := range integrationsMap { - stage := am.createReceiverStage(name, integrationsMap[name], am.waitFunc, am.notificationLog) + stage := am.createReceiverStage(name, nfstatus.GetIntegrations(integrationsMap[name]), am.waitFunc, am.notificationLog) routingStage[name] = notify.MultiStage{meshStage, silencingStage, timeMuteStage, inhibitionStage, stage} _, isActive := activeReceivers[name] - receivers = append(receivers, notify.NewReceiver(name, isActive, integrationsMap[name])) + receivers = append(receivers, nfstatus.NewReceiver(name, isActive, integrationsMap[name])) } am.setReceiverMetrics(receivers, len(activeReceivers)) @@ -440,7 +441,7 @@ func (am *GrafanaAlertmanager) setInhibitionRulesMetrics(r []InhibitRule) { am.Metrics.configuredInhibitionRules.WithLabelValues(am.tenantString()).Set(float64(len(r))) } -func (am *GrafanaAlertmanager) setReceiverMetrics(receivers []*notify.Receiver, countActiveReceivers int) { +func (am *GrafanaAlertmanager) setReceiverMetrics(receivers []*nfstatus.Receiver, countActiveReceivers int) { am.Metrics.configuredReceivers.WithLabelValues(am.tenantString(), ActiveStateLabelValue).Set(float64(countActiveReceivers)) am.Metrics.configuredReceivers.WithLabelValues(am.tenantString(), InactiveStateLabelValue).Set(float64(len(receivers) - countActiveReceivers)) diff --git a/notify/grafana_alertmanager_test.go b/notify/grafana_alertmanager_test.go index 73e47e72..92c27ef0 100644 --- a/notify/grafana_alertmanager_test.go +++ b/notify/grafana_alertmanager_test.go @@ -12,7 +12,6 @@ import ( "github.com/go-openapi/strfmt" amv2 "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/config" - "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/pkg/labels" "github.com/prometheus/alertmanager/provider/mem" "github.com/prometheus/alertmanager/types" @@ -20,6 +19,8 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + + "github.com/grafana/alerting/notify/nfstatus" ) func setupAMTest(t *testing.T) (*GrafanaAlertmanager, *prometheus.Registry) { @@ -575,18 +576,18 @@ func TestGrafanaAlertmanager_setInhibitionRulesMetrics(t *testing.T) { func TestGrafanaAlertmanager_setReceiverMetrics(t *testing.T) { fn := &fakeNotifier{} - integrations := []*notify.Integration{ - notify.NewIntegration(fn, fn, "grafana-oncall", 0, "test-grafana-oncall"), - notify.NewIntegration(fn, fn, "sns", 1, "test-sns"), + integrations := []*nfstatus.Integration{ + nfstatus.NewIntegration(fn, fn, "grafana-oncall", 0, "test-grafana-oncall"), + nfstatus.NewIntegration(fn, fn, "sns", 1, "test-sns"), } am, reg := setupAMTest(t) - receivers := []*notify.Receiver{ - notify.NewReceiver("ActiveNoIntegrations", true, nil), - notify.NewReceiver("InactiveNoIntegrations", false, nil), - notify.NewReceiver("ActiveMultipleIntegrations", true, integrations), - notify.NewReceiver("InactiveMultipleIntegrations", false, integrations), + receivers := []*nfstatus.Receiver{ + nfstatus.NewReceiver("ActiveNoIntegrations", true, nil), + nfstatus.NewReceiver("InactiveNoIntegrations", false, nil), + nfstatus.NewReceiver("ActiveMultipleIntegrations", true, integrations), + nfstatus.NewReceiver("InactiveMultipleIntegrations", false, integrations), } am.setReceiverMetrics(receivers, 2) diff --git a/notify/nfstatus/integration.go b/notify/nfstatus/integration.go new file mode 100644 index 00000000..5015c7a2 --- /dev/null +++ b/notify/nfstatus/integration.go @@ -0,0 +1,112 @@ +package nfstatus + +import ( + "context" + "sync" + "time" + + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/common/model" +) + +// Integration wraps an upstream notify.Integration, adding the ability to +// capture notification status. +type Integration struct { + status *statusCaptureNotifier + integration *notify.Integration +} + +// NewIntegration returns a new integration. +func NewIntegration(notifier notify.Notifier, rs notify.ResolvedSender, name string, idx int, receiverName string) *Integration { + // Wrap the provided Notifier with our own, which will capture notification attempt errors. + status := &statusCaptureNotifier{upstream: notifier} + + integration := notify.NewIntegration(status, rs, name, idx, receiverName) + + return &Integration{ + status: status, + integration: integration, + } +} + +// Integration returns the wrapped notify.Integration +func (i *Integration) Integration() *notify.Integration { + return i.integration +} + +// Notify implements the Notifier interface. +// Note that this is only included to make out Integration a drop-in replacement +// for parts of Grafana Alertmanager, we cannot actually pass our Integration to Prometheus. +func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { + return i.integration.Notify(ctx, alerts...) +} + +// SendResolved implements the ResolvedSender interface. +func (i *Integration) SendResolved() bool { + return i.integration.SendResolved() +} + +// Name returns the name of the integration. +func (i *Integration) Name() string { + return i.integration.Name() +} + +// Index returns the index of the integration. +func (i *Integration) Index() int { + return i.integration.Index() +} + +// String implements the Stringer interface. +func (i *Integration) String() string { + return i.integration.String() +} + +// GetReport returns information about the last notification attempt. +func (i *Integration) GetReport() (time.Time, model.Duration, error) { + return i.status.GetReport() +} + +// GetIntegrations is a convenience function to unwrap all the notify.GetIntegrations +// from a slice of nfstatus.Integration. +func GetIntegrations(integrations []*Integration) []*notify.Integration { + result := make([]*notify.Integration, len(integrations)) + for i := range integrations { + result[i] = integrations[i].Integration() + } + return result +} + +// statusCaptureNotifier is used to wrap a notify.Notifer and capture information about attempts. +type statusCaptureNotifier struct { + upstream notify.Notifier + + mtx sync.RWMutex + lastNotifyAttempt time.Time + lastNotifyAttemptDuration model.Duration + lastNotifyAttemptError error +} + +// Notify implements the Notifier interface. +func (n *statusCaptureNotifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { + start := time.Now() + retry, err := n.upstream.Notify(ctx, alerts...) + duration := time.Since(start) + + n.mtx.Lock() + defer n.mtx.Unlock() + + n.lastNotifyAttempt = start + n.lastNotifyAttemptDuration = model.Duration(duration) + n.lastNotifyAttemptError = err + + return retry, err +} + +// GetReport returns information about the last notification attempt. +func (n *statusCaptureNotifier) GetReport() (time.Time, model.Duration, error) { + n.mtx.RLock() + defer n.mtx.RUnlock() + + return n.lastNotifyAttempt, n.lastNotifyAttemptDuration, n.lastNotifyAttemptError +} diff --git a/notify/nfstatus/integration_test.go b/notify/nfstatus/integration_test.go new file mode 100644 index 00000000..3e8fd86f --- /dev/null +++ b/notify/nfstatus/integration_test.go @@ -0,0 +1,78 @@ +package nfstatus + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" +) + +type fakeNotifier struct { + retry bool + err error +} + +func (f *fakeNotifier) Notify(_ context.Context, _ ...*types.Alert) (bool, error) { + return f.retry, f.err +} + +type fakeResolvedSender struct { + sendResolved bool +} + +func (f *fakeResolvedSender) SendResolved() bool { + return f.sendResolved +} + +func TestIntegration(t *testing.T) { + notifier := &fakeNotifier{} + rs := &fakeResolvedSender{} + integration := NewIntegration(notifier, rs, "foo", 42, "bar") + + // Check wrapped functions work as expected. + assert.Equal(t, "foo", integration.Name()) + assert.Equal(t, 42, integration.Index()) + rs.sendResolved = false + assert.Equal(t, false, integration.SendResolved()) + rs.sendResolved = true + assert.Equal(t, true, integration.SendResolved()) + + // Check that status is empty if no notifications have happened. + lastAttempt, lastDuration, lastError := integration.GetReport() + assert.Equal(t, time.Time{}, lastAttempt) + assert.Equal(t, model.Duration(0), lastDuration) + assert.Equal(t, nil, lastError) + + // Check that status is collected on successful notification. + notifier.retry = false + notifier.err = nil + retry, err := integration.Notify(context.Background()) + assert.Equal(t, notifier.retry, retry) + assert.NoError(t, notifier.err, err) + lastAttempt, lastDuration, lastError = integration.GetReport() + assert.NotEqual(t, time.Time{}, lastAttempt) + assert.NotEqual(t, model.Duration(0), lastDuration) + assert.Equal(t, nil, lastError) + + // Check retry is propagated correctly. + notifier.retry = true + notifier.err = nil + retry, err = integration.Notify(context.Background()) + assert.Equal(t, notifier.retry, retry) + assert.Equal(t, notifier.err, err) + + // Check errors are propagated, and returned in the status. + notifier.retry = false + notifier.err = errors.New("An error") + retry, err = integration.Notify(context.Background()) + assert.Equal(t, notifier.retry, retry) + assert.Equal(t, notifier.err, err) + lastAttempt, lastDuration, lastError = integration.GetReport() + assert.NotEqual(t, time.Time{}, lastAttempt) + assert.NotEqual(t, model.Duration(0), lastDuration) + assert.Equal(t, "An error", lastError.Error()) +} diff --git a/notify/nfstatus/receiver.go b/notify/nfstatus/receiver.go new file mode 100644 index 00000000..84af2156 --- /dev/null +++ b/notify/nfstatus/receiver.go @@ -0,0 +1,36 @@ +package nfstatus + +import ( + "github.com/prometheus/alertmanager/notify" +) + +// Receiver wraps a notify.Receiver, but additionally holds onto nfstatus.Integration. +type Receiver struct { + receiver *notify.Receiver + integrations []*Integration +} + +func (r *Receiver) Receiver() *notify.Receiver { + return r.receiver +} + +func (r *Receiver) Name() string { + return r.receiver.Name() +} + +func (r *Receiver) Active() bool { + return r.receiver.Active() +} + +func (r *Receiver) Integrations() []*Integration { + return r.integrations +} + +func NewReceiver(name string, active bool, integrations []*Integration) *Receiver { + receiver := notify.NewReceiver(name, active, GetIntegrations(integrations)) + + return &Receiver{ + receiver: receiver, + integrations: integrations, + } +} From b8d55d26920bb1705ac4218a8e14f214d94838b7 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Tue, 28 May 2024 00:07:54 +0200 Subject: [PATCH 2/2] Don't use notify.Receiver, it doesn't exist upstream. --- notify/nfstatus/receiver.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/notify/nfstatus/receiver.go b/notify/nfstatus/receiver.go index 84af2156..8953781c 100644 --- a/notify/nfstatus/receiver.go +++ b/notify/nfstatus/receiver.go @@ -1,25 +1,18 @@ package nfstatus -import ( - "github.com/prometheus/alertmanager/notify" -) - -// Receiver wraps a notify.Receiver, but additionally holds onto nfstatus.Integration. +// Receiver holds onto a slice of nfstatus.Integration and some metadata. type Receiver struct { - receiver *notify.Receiver + name string integrations []*Integration -} - -func (r *Receiver) Receiver() *notify.Receiver { - return r.receiver + active bool } func (r *Receiver) Name() string { - return r.receiver.Name() + return r.name } func (r *Receiver) Active() bool { - return r.receiver.Active() + return r.active } func (r *Receiver) Integrations() []*Integration { @@ -27,10 +20,9 @@ func (r *Receiver) Integrations() []*Integration { } func NewReceiver(name string, active bool, integrations []*Integration) *Receiver { - receiver := notify.NewReceiver(name, active, GetIntegrations(integrations)) - return &Receiver{ - receiver: receiver, + name: name, + active: active, integrations: integrations, } }