Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix repeated resolved alerts #3006

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
@@ -475,8 +475,12 @@ func (ag *aggrGroup) stop() {

// insert inserts the alert into the aggregation group.
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
if err := ag.alerts.SetOrReplaceResolved(alert); err != nil {
if errors.Is(err, store.ErrNotFound) {
level.Warn(ag.logger).Log("msg", "ignore resolved alert since there is no corresponding record in the store")
} else {
level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
}
}

// Immediately trigger a flush if the wait duration for this
@@ -517,17 +521,15 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {

if notify(alertsSlice...) {
for _, a := range alertsSlice {
// Only delete if the fingerprint has not been inserted
// Only delete the resolved alert if the fingerprint has not been active
// again since we notified about it.
fp := a.Fingerprint()
got, err := ag.alerts.Get(fp)
if err != nil {
// This should never happen.
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
if !a.Resolved() {
continue
}
if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
if err := ag.alerts.Delete(fp); err != nil {
if err := ag.alerts.DeleteIfResolved(a.Fingerprint()); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need an atomic operation to ensure no race between Get and Delete.

if errors.Is(err, store.ErrNotResolved) {
level.Debug(ag.logger).Log("msg", "resolved alert has been active again", "alert", a.String())
} else {
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
}
}
95 changes: 93 additions & 2 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
@@ -601,6 +601,23 @@ func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.A
return ctx, nil, nil
}

type notifyStage struct {
C chan struct{}
}

func newNotifyStage() *notifyStage {
return &notifyStage{C: make(chan struct{}, 1)}
}

func (s *notifyStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-ctx.Done():
return ctx, nil, ctx.Err()
case s.C <- struct{}{}:
return ctx, alerts, nil
}
}

var (
// Set the start time in the past to trigger a flush immediately.
t0 = time.Now().Add(-time.Minute)
@@ -651,7 +668,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
route := &Route{
RouteOpts: RouteOpts{
Receiver: "default",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupBy: map[model.LabelName]struct{}{model.AlertNameLabel: {}},
GroupWait: 0,
GroupInterval: 1 * time.Hour, // Should never hit in this test.
RepeatInterval: 1 * time.Hour, // Should never hit in this test.
@@ -666,7 +683,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)

// Push all alerts.
for i := 0; i < numAlerts; i++ {
alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))})
alert := newAlert(model.LabelSet{model.AlertNameLabel: model.LabelValue(fmt.Sprintf("Alert_%d", i))})
require.NoError(t, alerts.Put(alert))
}

@@ -684,6 +701,80 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
require.Len(t, recorder.Alerts(), numAlerts)
}

func TestDispatcherReceiveAndNotifyRepeatedResolvedAlerts(t *testing.T) {
// More information here: https://github.com/prometheus/alertmanager/pull/3006

logger := log.NewNopLogger()
// logger := log.NewLogfmtLogger(os.Stdout)
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()

route := &Route{
RouteOpts: RouteOpts{
Receiver: "default",
GroupBy: map[model.LabelName]struct{}{model.AlertNameLabel: {}},
GroupWait: 0,
GroupInterval: 6 * time.Second,
RepeatInterval: 6 * time.Second,
},
}

recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
notifier := newNotifyStage()
dispatcher := NewDispatcher(alerts, route, notify.FanoutStage{recorder, notifier}, marker, nil, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
go dispatcher.Run()
defer dispatcher.Stop()

// Here we simulate the case when the alertmanager receives resolved alerts
// right after the aggrGroup is deleted.
t.Run("repeated alerts after aggrGroup deleted", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! 👋 Josh has asked if I can help take a look at this issue. I'm looking at this test here but I'm not sure I understand what is being tested and how this is showing one of the reported race conditions (I understand there are at least two?).

Here you are adding an expired alert to the store and waiting for a flush. Your test expects no flush to occur, so when it does occur the test fails. What I don't understand is how this relates to the race condition? It is expected that when Alertmanager receives a resolved alert it still creates an aggregation group and flushes it.

Copy link
Contributor Author

@damnever damnever May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall correctly, this kind of test is hard to write. This is just attempt of demonstrating that the main branch will fail.

This PR includes and fixes certain race conditions, but its primary focus is addressing repeated resolved alerts. The race condition simply constitutes a minor part of the fix. I don't know if you agree with this.

The race condition is clearly not tested here, the test doesn't mention the race condition. However, I will do my best to address the race condition in simple text.. the race condition could result in normal alerts being deleted if the same alert becomes active again and is received at the same time as the deletion of the alert.

Ok, the condition in this test here may produce repeated resolved (orphan) alerts if the previous firing alert fails to be sent to the alertmanager due to network conditions or something similar, which I have described before. We reject storing resolved alerts if the corresponding firing alert is not found, it may have been lost due to network conditions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the race condition could result in normal alerts being deleted if the same alert becomes active again and is received at the same time as the deletion of the alert.

OK, I think I understand the issue!

Goroutine #1: You have a Resolved alert that is about to be deleted because the if statement is true:

if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
	...
}

Goroutine #2: Between execution of the condition and execution of the code in the if statement, a firing alert is put into the store via func (ag *aggrGroup) insert(alert *types.Alert), changing the alert back from resolved to firing.

Goroutine #1: The code in the if statement resumes execution and deletes the alert from the store. However, the alert has changed since the condition was executed, and got no longer refers to the alert in the store. The firing alert is deleted.

if err := ag.alerts.Delete(fp); err != nil {
	...
}

We reject storing resolved alerts if the corresponding firing alert is not found, it may have been lost due to network conditions.

I don't think this fix works though. It has a bug where the Alertmanager does not accept resolved alerts following a crash or restart. Alertmanager does not write its state to disk, so there is no firing alert when the resolved alert is received.

Copy link
Contributor Author

@damnever damnever May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, I would prefer to avoid receiving an orphan resolved alert, and I think that might be why we have an option called resolved_timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by orphaned alert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, AlertManager could deliver a resolved alert without a preceding firing alert, so I refer to it as an 'orphan resolved alert'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the resolved alert should not be too late, so we can manage the restart scenario efficiently by comparing the process start time with the creation time of the aggregation group.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will be difficult to avoid for a number of reasons. Let's still fix the race condition though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! I am able to reproduce the issue with a unit test.

The test creates and starts a dispatcher, inserts a firing alert into the store, and waits for the alert to be flushed. It then inserts a resolved alert and waits for it to be flushed too.

Last it inserts the same firing alert that it inserted at the start of the test and waits for that to be flushed. However, the flush never happens because the firing alert is deleted.

=== RUN   TestDispatcherRaceFiringAlertDeleted
level=debug component=dispatcher msg="Received alert" alert=[3fff2c2][active]
level=debug component=dispatcher aggrGroup={}:{} msg=flushing alerts=[[3fff2c2][active]]
level=debug component=dispatcher msg="Received alert" alert=[3fff2c2][resolved]
level=debug component=dispatcher aggrGroup={}:{} msg=flushing alerts=[[3fff2c2][resolved]]
level=debug component=dispatcher msg="Received alert" alert=[3fff2c2][active]
level=debug component=dispatcher aggrGroup={}:{} msg="Deleted alert" alert=[3fff2c2][active]
    dispatch_test.go:693:
--- FAIL: TestDispatcherRaceFiringAlertDeleted (3.00s)
FAIL
FAIL	github.com/prometheus/alertmanager/dispatch	3.260s
FAIL

To make the race condition easier to observe I added a 1ms sleep to the dispatcher between the call to Get(fp) and Delete(fp):

diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go
index 104471e3..c2669496 100644
--- a/dispatch/dispatch.go
+++ b/dispatch/dispatch.go
@@ -526,10 +526,16 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
                                level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
                                continue
                        }
+                       // Insert a 1ms sleep. This allows the scheduler to resume the test
+                       // between ag.alerts.Get(fp) and ag.alerts.Delete(fp) during which
+                       // it should replace the resolved alert with a firing alert.
+                       <-time.After(time.Millisecond)
                        if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
-                               if err := ag.alerts.Delete(fp); err != nil {
+                               deleted, err := ag.alerts.Delete(fp)
+                               if err != nil {
                                        level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
                                }
+                               level.Info(ag.logger).Log("msg", "Deleted alert", "alert", deleted)
                        }
                }
        }

I also updated the store to return the deleted alert:

diff --git a/store/store.go b/store/store.go
index 9b30542f..2d5e4666 100644
--- a/store/store.go
+++ b/store/store.go
@@ -115,12 +115,12 @@ func (a *Alerts) Set(alert *types.Alert) error {
 }

 // Delete removes the Alert with the matching fingerprint from the store.
-func (a *Alerts) Delete(fp model.Fingerprint) error {
+func (a *Alerts) Delete(fp model.Fingerprint) (*types.Alert, error) {
        a.Lock()
        defer a.Unlock()
-
+       tmp := a.c[fp]
        delete(a.c, fp)
-       return nil
+       return tmp, nil
 }

We can see that the the firing alert is deleted:

level=debug component=dispatcher aggrGroup={}:{} msg=flushing alerts=[[3fff2c2][resolved]]
level=debug component=dispatcher msg="Received alert" alert=[3fff2c2][active]
level=debug component=dispatcher aggrGroup={}:{} msg="Deleted alert" alert=[3fff2c2][active]

However, what should happen is no alert is deleted as the alert in the store was updated during the flush.

alert := newAlert(model.LabelSet{model.AlertNameLabel: "test-repeated-resolved-alerts-1"})
alert.Alert.EndsAt = alert.StartsAt.Add(time.Second)
alert.UpdatedAt = alert.Alert.EndsAt
require.True(t, alert.Resolved())
require.NoError(t, alerts.Put(alert))
select {
case <-time.After(20 * time.Second):
case <-notifier.C:
t.Errorf("unexpected repeated resolved alerts")
}
})

// Alertmanager receives repeated resolved alerts after aggrGroup.flush.
t.Run("repeated alerts after aggrGroup flush", func(t *testing.T) {
alert := newAlert(model.LabelSet{model.AlertNameLabel: "test-repeated-resolved-alerts-2"})
require.NoError(t, alerts.Put(alert))
select {
case <-time.After(20 * time.Second):
t.Errorf("wait active alert timed out")
case <-notifier.C:
}

alert.Alert.EndsAt = alert.StartsAt.Add(time.Second)
alert.UpdatedAt = alert.Alert.EndsAt
require.True(t, alert.Resolved())
require.NoError(t, alerts.Put(alert))
select {
case <-time.After(20 * time.Second):
t.Errorf("wait resolved alert timed out")
case <-notifier.C:
}

alert.UpdatedAt = alert.Alert.EndsAt.Add(time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other case I don't understand. You are changing the UpdatedAt time, putting the alert, and expecting it to be deleted. However, if the UpdatedAt time is incremented during a flush, the alert will not be deleted, and it will be flushed again. You won't get a second resolved notification though because the notification log will show that a resolved notification was sent. Your test skips that check though. Have I misunderstood?

Copy link
Contributor Author

@damnever damnever May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, once again, the testcase definitely requires improvement. I believe I should have copied the resolved alert and modified the start time to suit this case:

Prometheus/XXRuler failed to send some active alerts, but succeeded to send all resolved alerts:
alert-a: T1(active:notify-ok), T2(resolved:notify-ok), T3(active:notify-failed), T4(resolved:notify-ok).

We reject storing resolved alerts if the alert has already been resolved. This may be due to the corresponding firing alert getting lost due to network conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the notification log prevents the sending of repeated resolved alerts, then this is not the case.

Copy link
Contributor Author

@damnever damnever May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I believe the notification log has its own garbage collection function that operates independently. This case will occur when the timing is right. And This is the case we should handle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the notification log prevents the sending of repeated resolved alerts, then this is not the case.

That is correct! The notification log will contain an entry for the resolved alert as it was the last successful notification sent. Entries in the notification log are deleted after the retention period (5 days). I'm still not sure I understand the bug though?

require.True(t, alert.Resolved())
require.NoError(t, alerts.Put(alert))
select {
case <-time.After(20 * time.Second):
case <-notifier.C:
t.Errorf("unexpected repeated resolved alerts")
}
})
}

type limits struct {
groups int
}
40 changes: 37 additions & 3 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,12 @@ import (
"github.com/prometheus/alertmanager/types"
)

// ErrNotFound is returned if a Store cannot find the Alert.
var ErrNotFound = errors.New("alert not found")
var (
// ErrNotFound is returned if a Store cannot find the Alert.
ErrNotFound = errors.New("alert not found")
// ErrNotResolved is returned if the alert is not resolved.
ErrNotResolved = errors.New("alert is not resolved")
)

// Alerts provides lock-coordinated to an in-memory map of alerts, keyed by
// their fingerprint. Resolved alerts are removed from the map based on
@@ -98,10 +102,26 @@ func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {

// Set unconditionally sets the alert in memory.
func (a *Alerts) Set(alert *types.Alert) error {
fp := alert.Fingerprint()

a.Lock()
defer a.Unlock()

a.c[alert.Fingerprint()] = alert
a.c[fp] = alert
return nil
}

// SetOrReplaceResolved returns ErrNotFound if the alert is resolved and
// there is no corresponding record in the store.
func (a *Alerts) SetOrReplaceResolved(alert *types.Alert) error {
fp := alert.Fingerprint()

a.Lock()
defer a.Unlock()
if _, ok := a.c[fp]; !ok && alert.Resolved() {
return ErrNotFound
}
a.c[fp] = alert
return nil
}

@@ -114,6 +134,20 @@ func (a *Alerts) Delete(fp model.Fingerprint) error {
return nil
}

// DeleteIfResolved removes the Alert if it is resolved.
func (a *Alerts) DeleteIfResolved(fp model.Fingerprint) error {
a.Lock()
defer a.Unlock()

if exist, ok := a.c[fp]; !ok {
return ErrNotFound
} else if !exist.Resolved() {
return ErrNotResolved
}
delete(a.c, fp)
return nil
}

// List returns a slice of Alerts currently held in memory.
func (a *Alerts) List() []*types.Alert {
a.Lock()
40 changes: 33 additions & 7 deletions store/store_test.go
Original file line number Diff line number Diff line change
@@ -54,17 +54,33 @@ func TestDelete(t *testing.T) {
require.Equal(t, ErrNotFound, err)
}

func TestResolved(t *testing.T) {
a := NewAlerts()

now := time.Now()
require.NoError(t, a.SetOrReplaceResolved(makeAlert("a", now, -2, 10)))
resolved := makeAlert("a", now, -2, -1)
require.NoError(t, a.SetOrReplaceResolved(resolved))
require.NoError(t, a.SetOrReplaceResolved(resolved))
a.gc()
require.ErrorIs(t, a.SetOrReplaceResolved(resolved), ErrNotFound)

require.ErrorIs(t, a.DeleteIfResolved(resolved.Fingerprint()), ErrNotFound)
require.NoError(t, a.SetOrReplaceResolved(makeAlert("a", now, -2, 10)))
require.ErrorIs(t, a.DeleteIfResolved(resolved.Fingerprint()), ErrNotResolved)
require.NoError(t, a.SetOrReplaceResolved(resolved))
require.NoError(t, a.DeleteIfResolved(resolved.Fingerprint()))
_, err := a.Get(resolved.Fingerprint())
require.ErrorIs(t, err, ErrNotFound)
}

func TestGC(t *testing.T) {
now := time.Now()

newAlert := func(key string, start, end time.Duration) *types.Alert {
return &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{model.LabelName(key): "b"},
StartsAt: now.Add(start * time.Minute),
EndsAt: now.Add(end * time.Minute),
},
}
return makeAlert(key, now, start, end)
}

active := []*types.Alert{
newAlert("b", 10, 20),
newAlert("c", -10, 10),
@@ -111,3 +127,13 @@ func TestGC(t *testing.T) {
}
require.Len(t, resolved, n)
}

func makeAlert(key string, now time.Time, start, end time.Duration) *types.Alert {
return &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{model.LabelName(key): "b"},
StartsAt: now.Add(start * time.Minute),
EndsAt: now.Add(end * time.Minute),
},
}
}