diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 17ffe85a9a..18decadc11 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -601,6 +601,23 @@ func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.A return ctx, nil, nil } +type notifyStage struct { + C chan struct{} +} + +func newNotifyStage() *notifyStage { + return ¬ifyStage{C: make(chan struct{}, 1)} +} + +func (s *notifyStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + select { + case <-ctx.Done(): + return ctx, nil, ctx.Err() + case s.C <- struct{}{}: + return ctx, alerts, nil + } +} + var ( // Set the start time in the past to trigger a flush immediately. t0 = time.Now().Add(-time.Minute) @@ -651,7 +668,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) route := &Route{ RouteOpts: RouteOpts{ Receiver: "default", - GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupBy: map[model.LabelName]struct{}{model.AlertNameLabel: {}}, GroupWait: 0, GroupInterval: 1 * time.Hour, // Should never hit in this test. RepeatInterval: 1 * time.Hour, // Should never hit in this test. @@ -666,7 +683,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) // Push all alerts. for i := 0; i < numAlerts; i++ { - alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))}) + alert := newAlert(model.LabelSet{model.AlertNameLabel: model.LabelValue(fmt.Sprintf("Alert_%d", i))}) require.NoError(t, alerts.Put(alert)) } @@ -684,6 +701,133 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) require.Equal(t, numAlerts, len(recorder.Alerts())) } +func TestDispatcherReceiveAndNotifyRepeatedResolvedAlerts(t *testing.T) { + // More background here: https://github.com/prometheus/alertmanager/pull/3006 + + logger := log.NewNopLogger() + // logger := log.NewLogfmtLogger(os.Stdout) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{model.AlertNameLabel: {}}, + GroupWait: 0, + GroupInterval: 6 * time.Second, + RepeatInterval: 6 * time.Second, + }, + } + + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + notifier := newNotifyStage() + dispatcher := NewDispatcher(alerts, route, notify.FanoutStage{recorder, notifier}, marker, nil, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + go dispatcher.Run() + defer dispatcher.Stop() + + // Here we simulate the case when the alertmanager receives resolved alerts + // right after the aggrGroup is deleted. + t.Run("repeated alerts after aggrGroup deleted", func(t *testing.T) { + alert := newAlert(model.LabelSet{model.AlertNameLabel: "test-repeated-resolved-alerts-1"}) + alert.Alert.EndsAt = alert.StartsAt.Add(time.Second) + alert.UpdatedAt = alert.Alert.EndsAt + require.True(t, alert.Resolved()) + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + case <-notifier.C: + t.Errorf("unexpected repeated resolved alerts") + } + }) + + // Alertmanager receives repeated resolved alerts after aggrGroup.flush. + t.Run("repeated alerts after aggrGroup flush", func(t *testing.T) { + alert := newAlert(model.LabelSet{model.AlertNameLabel: "test-repeated-resolved-alerts-2"}) + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + t.Errorf("wait active alert timed out") + case <-notifier.C: + } + + alert.Alert.EndsAt = alert.StartsAt.Add(time.Second) + alert.UpdatedAt = alert.Alert.EndsAt + require.True(t, alert.Resolved()) + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + t.Errorf("wait resolved alert timed out") + case <-notifier.C: + } + + alert.UpdatedAt = alert.Alert.EndsAt.Add(time.Second) + require.True(t, alert.Resolved()) + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + case <-notifier.C: + t.Errorf("unexpected repeated resolved alerts") + } + }) +} + +func TestDispatcherRepeatedResolvedAlertsAfterAggrGroupGetsDeleted(t *testing.T) { + logger := log.NewNopLogger() + // logger := log.NewLogfmtLogger(os.Stdout) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{model.AlertNameLabel: {}}, + GroupWait: 0, + GroupInterval: 6 * time.Second, + RepeatInterval: 6 * time.Second, + }, + } + + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + notifier := newNotifyStage() + dispatcher := NewDispatcher(alerts, route, notify.FanoutStage{recorder, notifier}, marker, nil, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + go dispatcher.Run() + defer dispatcher.Stop() + + alert := newAlert(model.LabelSet{model.AlertNameLabel: "test-repeated-resolved-alerts"}) + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + t.Errorf("wait active alert timed out") + case <-notifier.C: + } + + alert.Alert.EndsAt = alert.StartsAt.Add(time.Second) + require.True(t, alert.Alert.EndsAt.Before(time.Now())) + alert.UpdatedAt = alert.Alert.EndsAt + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + t.Errorf("wait resolved alert timed out") + case <-notifier.C: + } + + alert.UpdatedAt = alert.Alert.EndsAt.Add(time.Second) + require.NoError(t, alerts.Put(alert)) + select { + case <-time.After(20 * time.Second): + case <-notifier.C: + t.Errorf("unexpected repeated resolved alerts") + } +} + type limits struct { groups int }