diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 0e39ba1d5f..251568589a 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -34,10 +34,11 @@ const alertChannelLength = 200 type Alerts struct { cancel context.CancelFunc + mtx sync.Mutex + alerts *store.Alerts marker types.Marker - mtx sync.Mutex listeners map[int]listeningAlerts next int @@ -100,35 +101,54 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, al logger: log.With(l, "component", "provider"), callback: alertCallback, } - a.alerts.SetGCCallback(func(alerts []*types.Alert) { - for _, alert := range alerts { + if r != nil { + a.registerMetrics(r) + } + + go a.gcLoop(ctx, intervalGC) + + return a, nil +} + +func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + a.gc() + } + } +} + +func (a *Alerts) gc() { + a.mtx.Lock() + defer a.mtx.Unlock() + + for _, alert := range a.alerts.List() { + if alert.Resolved() { + fp := alert.Fingerprint() + a.alerts.Delete(fp) + // As we don't persist alerts, we no longer consider them after // they are resolved. Alerts waiting for resolved notifications are // held in memory in aggregation groups redundantly. - m.Delete(alert.Fingerprint()) + a.marker.Delete(fp) a.callback.PostDelete(alert) } + } - a.mtx.Lock() - for i, l := range a.listeners { - select { - case <-l.done: - delete(a.listeners, i) - close(l.alerts) - default: - // listener is not closed yet, hence proceed. - } + for i, l := range a.listeners { + select { + case <-l.done: + delete(a.listeners, i) + close(l.alerts) + default: + // listener is not closed yet, hence proceed. } - a.mtx.Unlock() - }) - - if r != nil { - a.registerMetrics(r) } - - go a.alerts.Run(ctx, intervalGC) - - return a, nil } // Close the alert provider. @@ -178,8 +198,11 @@ func (a *Alerts) GetPending() provider.AlertIterator { go func() { defer close(ch) + a.mtx.Lock() + alerts := a.alerts.List() + a.mtx.Unlock() - for _, a := range a.alerts.List() { + for _, a := range alerts { select { case ch <- a: case <-done: @@ -193,6 +216,8 @@ func (a *Alerts) GetPending() provider.AlertIterator { // Get returns the alert for a given fingerprint. func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { + a.mtx.Lock() + defer a.mtx.Unlock() return a.alerts.Get(fp) } @@ -201,6 +226,7 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { for _, alert := range alerts { fp := alert.Fingerprint() + a.mtx.Lock() existing := false // Check that there's an alert existing within the store before @@ -216,18 +242,19 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { } if err := a.callback.PreStore(alert, existing); err != nil { + a.mtx.Unlock() level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err) continue } if err := a.alerts.Set(alert); err != nil { + a.mtx.Unlock() level.Error(a.logger).Log("msg", "error on set alert", "err", err) continue } a.callback.PostStore(alert, existing) - a.mtx.Lock() for _, l := range a.listeners { select { case l.alerts <- alert: @@ -242,6 +269,9 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { // count returns the number of non-resolved alerts we currently have stored filtered by the provided state. func (a *Alerts) count(state types.AlertState) int { + a.mtx.Lock() + defer a.mtx.Unlock() + var count int for _, alert := range a.alerts.List() { if alert.Resolved() { diff --git a/provider/mem/mem_test.go b/provider/mem/mem_test.go index a45321844c..7a1d71ec72 100644 --- a/provider/mem/mem_test.go +++ b/provider/mem/mem_test.go @@ -15,6 +15,7 @@ package mem import ( "context" + "errors" "fmt" "reflect" "strconv" @@ -504,3 +505,62 @@ func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) { func (l *limitCountCallback) PostDelete(_ *types.Alert) { l.alerts.Dec() } + +func TestAlertsConcurrently(t *testing.T) { + callback := &limitCountCallback{limit: 100} + a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, log.NewNopLogger(), nil) + require.NoError(t, err) + + stopc := make(chan struct{}) + failc := make(chan struct{}) + go func() { + time.Sleep(2 * time.Second) + close(stopc) + }() + expire := 10 * time.Millisecond + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + j := 0 + for { + select { + case <-failc: + return + case <-stopc: + return + default: + } + now := time.Now() + err := a.Put(&types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"bar": model.LabelValue(strconv.Itoa(j))}, + StartsAt: now, + EndsAt: now.Add(expire), + }, + UpdatedAt: now, + }) + if err != nil && !errors.Is(err, errTooManyAlerts) { + close(failc) + return + } + j++ + } + }() + } + wg.Wait() + select { + case <-failc: + t.Fatalf("unexpected error happened") + default: + } + + time.Sleep(expire) + require.Eventually(t, func() bool { + // When the alert will eventually expire and is considered resolved - it won't count. + return a.count(types.AlertStateActive) == 0 + }, 2*expire, expire) + require.Equal(t, int32(0), callback.alerts.Load()) +}