diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 93780b8edb..754b9b5c42 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -439,6 +439,7 @@ func run() int { inhibitor, silencer, intervener, + marker, notificationLog, pipelinePeer, ) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 822438e084..2468ee8f0e 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -78,6 +78,7 @@ type Dispatcher struct { route *Route alerts provider.Alerts stage notify.Stage + marker types.GroupMarker metrics *DispatcherMetrics limits Limits @@ -107,7 +108,7 @@ func NewDispatcher( ap provider.Alerts, r *Route, s notify.Stage, - mk types.AlertMarker, + mk types.GroupMarker, to func(time.Duration) time.Duration, lim Limits, l log.Logger, @@ -121,6 +122,7 @@ func NewDispatcher( alerts: ap, stage: s, route: r, + marker: mk, timeout: to, logger: log.With(l, "component", "dispatcher"), metrics: m, @@ -145,8 +147,8 @@ func (d *Dispatcher) Run() { } func (d *Dispatcher) run(it provider.AlertIterator) { - cleanup := time.NewTicker(30 * time.Second) - defer cleanup.Stop() + maintenance := time.NewTicker(30 * time.Second) + defer maintenance.Stop() defer it.Close() @@ -175,28 +177,30 @@ func (d *Dispatcher) run(it provider.AlertIterator) { } d.metrics.processingDuration.Observe(time.Since(now).Seconds()) - case <-cleanup.C: - d.mtx.Lock() - - for _, groups := range d.aggrGroupsPerRoute { - for _, ag := range groups { - if ag.empty() { - ag.stop() - delete(groups, ag.fingerprint()) - d.aggrGroupsNum-- - d.metrics.aggrGroups.Dec() - } - } - } - - d.mtx.Unlock() - + case <-maintenance.C: + d.doMaintenance() case <-d.ctx.Done(): return } } } +func (d *Dispatcher) doMaintenance() { + d.mtx.Lock() + defer d.mtx.Unlock() + for _, groups := range d.aggrGroupsPerRoute { + for _, ag := range groups { + if ag.empty() { + ag.stop() + d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey()) + delete(groups, ag.fingerprint()) + d.aggrGroupsNum-- + d.metrics.aggrGroups.Dec() + } + } + } +} + // AlertGroup represents how alerts exist within an aggrGroup. type AlertGroup struct { Alerts types.AlertSlice @@ -374,6 +378,7 @@ type aggrGroup struct { labels model.LabelSet opts *RouteOpts logger log.Logger + routeID string routeKey string alerts *store.Alerts @@ -394,6 +399,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func( } ag := &aggrGroup{ labels: labels, + routeID: r.ID(), routeKey: r.Key(), opts: &r.RouteOpts, timeout: to, @@ -447,6 +453,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) + ctx = notify.WithRouteID(ctx, ag.routeID) // Wait the configured interval before calling flush again. ag.mtx.Lock() diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 0c8cbf7855..ad7d35b577 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -715,3 +715,48 @@ type limits struct { func (l limits) MaxNumberOfAggregationGroups() int { return l.groups } + +func TestDispatcher_DoMaintenance(t *testing.T) { + r := prometheus.NewRegistry() + marker := types.NewMarker(r) + + alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, log.NewNopLogger(), nil) + if err != nil { + t.Fatal(err) + } + + route := &Route{ + RouteOpts: RouteOpts{ + GroupBy: map[model.LabelName]struct{}{"alertname": {}}, + GroupWait: 0, + GroupInterval: 5 * time.Minute, // Should never hit in this test. + }, + } + timeout := func(d time.Duration) time.Duration { return d } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + + ctx := context.Background() + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, log.NewNopLogger(), NewDispatcherMetrics(false, r)) + aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup) + aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup) + + // Insert an aggregation group with no alerts. + labels := model.LabelSet{"alertname": "1"} + aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, log.NewNopLogger()) + aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1 + dispatcher.aggrGroupsPerRoute = aggrGroups + // Must run otherwise doMaintenance blocks on aggrGroup1.stop(). + go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true }) + + // Insert a marker for the aggregation group's group key. + marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"}) + mutedBy, isMuted := marker.Muted(route.ID(), aggrGroup1.GroupKey()) + require.True(t, isMuted) + require.Equal(t, []string{"weekends"}, mutedBy) + + // Run the maintenance and the marker should be removed. + dispatcher.doMaintenance() + mutedBy, isMuted = marker.Muted(route.ID(), aggrGroup1.GroupKey()) + require.False(t, isMuted) + require.Empty(t, mutedBy) +} diff --git a/notify/notify.go b/notify/notify.go index 5eac12f40e..d1065ab793 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -119,6 +119,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyRouteID ) // WithReceiverName populates a context with a receiver name. @@ -165,6 +166,10 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { return context.WithValue(ctx, keyActiveTimeIntervals, at) } +func WithRouteID(ctx context.Context, routeID string) context.Context { + return context.WithValue(ctx, keyRouteID, routeID) +} + // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { @@ -228,6 +233,13 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) { return v, ok } +// RouteID extracts a RouteID from the context. Iff none exists, the +// // second argument is false. +func RouteID(ctx context.Context) (string, bool) { + v, ok := ctx.Value(keyRouteID).(string) + return v, ok +} + // A Stage processes alerts under the constraints of the given context. type Stage interface { Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) @@ -384,6 +396,7 @@ func (pb *PipelineBuilder) New( inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, intervener *timeinterval.Intervener, + marker types.GroupMarker, notificationLog NotificationLog, peer Peer, ) RoutingStage { @@ -391,8 +404,8 @@ func (pb *PipelineBuilder) New( ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor, pb.metrics) - tas := NewTimeActiveStage(intervener, pb.metrics) - tms := NewTimeMuteStage(intervener, pb.metrics) + tas := NewTimeActiveStage(intervener, marker, pb.metrics) + tms := NewTimeMuteStage(intervener, marker, pb.metrics) ss := NewMuteStage(silencer, pb.metrics) for name := range receivers { @@ -923,18 +936,29 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ type timeStage struct { muter types.TimeMuter + marker types.GroupMarker metrics *Metrics } type TimeMuteStage timeStage -func NewTimeMuteStage(m types.TimeMuter, metrics *Metrics) *TimeMuteStage { - return &TimeMuteStage{m, metrics} +func NewTimeMuteStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeMuteStage { + return &TimeMuteStage{muter, marker, metrics} } // Exec implements the stage interface for TimeMuteStage. // TimeMuteStage is responsible for muting alerts whose route is not in an active time. func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + routeID, ok := RouteID(ctx) + if !ok { + return ctx, nil, errors.New("route ID missing") + } + + gkey, ok := GroupKey(ctx) + if !ok { + return ctx, nil, errors.New("group key missing") + } + muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx) if !ok { return ctx, alerts, nil @@ -949,10 +973,12 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type return ctx, alerts, nil } - muted, _, err := tms.muter.Mutes(muteTimeIntervalNames, now) + muted, mutedBy, err := tms.muter.Mutes(muteTimeIntervalNames, now) if err != nil { return ctx, alerts, err } + // If muted is false then mutedBy is nil and the muted marker is removed. + tms.marker.SetMuted(routeID, gkey, mutedBy) // If the current time is inside a mute time, all alerts are removed from the pipeline. if muted { @@ -960,18 +986,29 @@ func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*type level.Debug(l).Log("msg", "Notifications not sent, route is within mute time", "alerts", len(alerts)) return ctx, nil, nil } + return ctx, alerts, nil } type TimeActiveStage timeStage -func NewTimeActiveStage(m types.TimeMuter, metrics *Metrics) *TimeActiveStage { - return &TimeActiveStage{m, metrics} +func NewTimeActiveStage(muter types.TimeMuter, marker types.GroupMarker, metrics *Metrics) *TimeActiveStage { + return &TimeActiveStage{muter, marker, metrics} } // Exec implements the stage interface for TimeActiveStage. // TimeActiveStage is responsible for muting alerts whose route is not in an active time. func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + routeID, ok := RouteID(ctx) + if !ok { + return ctx, nil, errors.New("route ID missing") + } + + gkey, ok := GroupKey(ctx) + if !ok { + return ctx, nil, errors.New("group key missing") + } + activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx) if !ok { return ctx, alerts, nil @@ -987,13 +1024,22 @@ func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*ty return ctx, alerts, errors.New("missing now timestamp") } - muted, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) + active, _, err := tas.muter.Mutes(activeTimeIntervalNames, now) if err != nil { return ctx, alerts, err } + var mutedBy []string + if !active { + // If the group is muted, then it must be muted by all active time intervals. + // Otherwise, the group must be in at least one active time interval for it + // to be active. + mutedBy = activeTimeIntervalNames + } + tas.marker.SetMuted(routeID, gkey, mutedBy) + // If the current time is not inside an active time, all alerts are removed from the pipeline - if !muted { + if !active { tas.metrics.numNotificationSuppressedTotal.WithLabelValues(SuppressedReasonActiveTimeInterval).Add(float64(len(alerts))) level.Debug(l).Log("msg", "Notifications not sent, route is not within active time", "alerts", len(alerts)) return ctx, nil, nil diff --git a/notify/notify_test.go b/notify/notify_test.go index 5077c6d666..5c2297e993 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "reflect" + "sort" "strings" "testing" "time" @@ -827,12 +828,6 @@ func TestTimeMuteStage(t *testing.T) { } eveningsAndWeekends := map[string][]timeinterval.TimeInterval{ "evenings": {{ - Weekdays: []timeinterval.WeekdayRange{{ - InclusiveRange: timeinterval.InclusiveRange{ - Begin: 1, // Monday - End: 5, // Friday - }, - }}, Times: []timeinterval.TimeRange{{ StartMinute: 0, // 00:00 EndMinute: 540, // 09:00 @@ -877,30 +872,41 @@ func TestTimeMuteStage(t *testing.T) { alerts: []*types.Alert{{Alert: model.Alert{Labels: model.LabelSet{"foo": "bar"}}}}, mutedBy: []string{"weekends"}, }, { - name: "Should be muted at 12pm UTC", + name: "Should be muted at 12pm UTC on a weekday", intervals: eveningsAndWeekends, - now: time.Date(2024, 1, 6, 10, 0, 0, 0, time.UTC), + now: time.Date(2024, 1, 1, 10, 0, 0, 0, time.UTC), alerts: []*types.Alert{{Alert: model.Alert{Labels: model.LabelSet{"foo": "bar"}}}}, mutedBy: []string{"evenings"}, + }, { + name: "Should be muted at 12pm UTC on a weekend", + intervals: eveningsAndWeekends, + now: time.Date(2024, 1, 6, 10, 0, 0, 0, time.UTC), + alerts: []*types.Alert{{Alert: model.Alert{Labels: model.LabelSet{"foo": "bar"}}}}, + mutedBy: []string{"evenings", "weekends"}, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { r := prometheus.NewRegistry() + marker := types.NewMarker(r) metrics := NewMetrics(r, featurecontrol.NoopFlags{}) intervener := timeinterval.NewIntervener(test.intervals) - st := NewTimeMuteStage(intervener, metrics) + st := NewTimeMuteStage(intervener, marker, metrics) // Get the names of all time intervals for the context. muteTimeIntervalNames := make([]string, 0, len(test.intervals)) for name := range test.intervals { muteTimeIntervalNames = append(muteTimeIntervalNames, name) } + // Sort the names so we can compare mutedBy with test.mutedBy. + sort.Strings(muteTimeIntervalNames) ctx := context.Background() ctx = WithNow(ctx, test.now) + ctx = WithGroupKey(ctx, "group1") ctx = WithActiveTimeIntervals(ctx, nil) ctx = WithMuteTimeIntervals(ctx, muteTimeIntervalNames) + ctx = WithRouteID(ctx, "route1") _, active, err := st.Exec(ctx, log.NewNopLogger(), test.alerts...) require.NoError(t, err) @@ -908,14 +914,33 @@ func TestTimeMuteStage(t *testing.T) { if len(test.mutedBy) == 0 { // All alerts should be active. require.Equal(t, len(test.alerts), len(active)) + // The group should not be marked. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.False(t, isMuted) + require.Empty(t, mutedBy) // The metric for total suppressed notifications should not // have been incremented, which means it will not be collected. - require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(""))) + require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 +`))) } else { // All alerts should be muted. require.Empty(t, active) + // The group should be marked as muted. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, test.mutedBy, mutedBy) // Gets the metric for total suppressed notifications. require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(fmt.Sprintf(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 # HELP alertmanager_notifications_suppressed_total The total number of notifications suppressed for being silenced, inhibited, outside of active time intervals or within muted time intervals. # TYPE alertmanager_notifications_suppressed_total counter alertmanager_notifications_suppressed_total{reason="mute_time_interval"} %d @@ -981,20 +1006,25 @@ func TestTimeActiveStage(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { r := prometheus.NewRegistry() + marker := types.NewMarker(r) metrics := NewMetrics(r, featurecontrol.NoopFlags{}) intervener := timeinterval.NewIntervener(test.intervals) - st := NewTimeActiveStage(intervener, metrics) + st := NewTimeActiveStage(intervener, marker, metrics) // Get the names of all time intervals for the context. activeTimeIntervalNames := make([]string, 0, len(test.intervals)) for name := range test.intervals { activeTimeIntervalNames = append(activeTimeIntervalNames, name) } + // Sort the names so we can compare mutedBy with test.mutedBy. + sort.Strings(activeTimeIntervalNames) ctx := context.Background() ctx = WithNow(ctx, test.now) + ctx = WithGroupKey(ctx, "group1") ctx = WithActiveTimeIntervals(ctx, activeTimeIntervalNames) ctx = WithMuteTimeIntervals(ctx, nil) + ctx = WithRouteID(ctx, "route1") _, active, err := st.Exec(ctx, log.NewNopLogger(), test.alerts...) require.NoError(t, err) @@ -1002,14 +1032,33 @@ func TestTimeActiveStage(t *testing.T) { if len(test.mutedBy) == 0 { // All alerts should be active. require.Equal(t, len(test.alerts), len(active)) + // The group should not be marked. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.False(t, isMuted) + require.Empty(t, mutedBy) // The metric for total suppressed notifications should not // have been incremented, which means it will not be collected. - require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(""))) + require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 +`))) } else { // All alerts should be muted. require.Empty(t, active) + // The group should be marked as muted. + mutedBy, isMuted := marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, test.mutedBy, mutedBy) // Gets the metric for total suppressed notifications. require.NoError(t, prom_testutil.GatherAndCompare(r, strings.NewReader(fmt.Sprintf(` +# HELP alertmanager_marked_alerts How many alerts by state are currently marked in the Alertmanager regardless of their expiry. +# TYPE alertmanager_marked_alerts gauge +alertmanager_marked_alerts{state="active"} 0 +alertmanager_marked_alerts{state="suppressed"} 0 +alertmanager_marked_alerts{state="unprocessed"} 0 # HELP alertmanager_notifications_suppressed_total The total number of notifications suppressed for being silenced, inhibited, outside of active time intervals or within muted time intervals. # TYPE alertmanager_notifications_suppressed_total counter alertmanager_notifications_suppressed_total{reason="active_time_interval"} %d diff --git a/types/types.go b/types/types.go index 648f0a7e01..85391ce91f 100644 --- a/types/types.go +++ b/types/types.go @@ -116,6 +116,9 @@ type GroupMarker interface { // intervals that mute it. If the list of names is nil or the empty slice // then the muted marker is removed. SetMuted(routeID, groupKey string, timeIntervalNames []string) + + // DeleteByGroupKey removes all markers for the GroupKey. + DeleteByGroupKey(routeID, groupKey string) } // NewMarker returns an instance of a AlertMarker implementation. @@ -158,6 +161,12 @@ func (m *MemMarker) SetMuted(routeID, groupKey string, timeIntervalNames []strin status.mutedBy = timeIntervalNames } +func (m *MemMarker) DeleteByGroupKey(routeID, groupKey string) { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.groups, routeID+groupKey) +} + func (m *MemMarker) registerMetrics(r prometheus.Registerer) { newMarkedAlertMetricByState := func(st AlertState) prometheus.GaugeFunc { return prometheus.NewGaugeFunc( diff --git a/types/types_test.go b/types/types_test.go index 45bc59e2f0..198804862b 100644 --- a/types/types_test.go +++ b/types/types_test.go @@ -61,6 +61,31 @@ func TestMemMarker_Muted(t *testing.T) { require.Empty(t, timeIntervalNames) } +func TestMemMarker_DeleteByGroupKey(t *testing.T) { + r := prometheus.NewRegistry() + marker := NewMarker(r) + + // Mark the group and check that it is muted. + marker.SetMuted("route1", "group1", []string{"weekends"}) + timeIntervalNames, isMuted := marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, []string{"weekends"}, timeIntervalNames) + + // Delete the markers for a different group key. The group should + // still be muted. + marker.DeleteByGroupKey("route1", "group2") + timeIntervalNames, isMuted = marker.Muted("route1", "group1") + require.True(t, isMuted) + require.Equal(t, []string{"weekends"}, timeIntervalNames) + + // Delete the markers for the correct group key. The group should + // no longer be muted. + marker.DeleteByGroupKey("route1", "group1") + timeIntervalNames, isMuted = marker.Muted("route1", "group1") + require.False(t, isMuted) + require.Empty(t, timeIntervalNames) +} + func TestMemMarker_Count(t *testing.T) { r := prometheus.NewRegistry() marker := NewMarker(r)