Skip to content

Commit

Permalink
Fix race conditions in the memory alerts store
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
  • Loading branch information
damnever committed Dec 28, 2023
1 parent c920b60 commit 2c6748d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 24 deletions.
78 changes: 54 additions & 24 deletions provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ const alertChannelLength = 200
type Alerts struct {
cancel context.CancelFunc

mtx sync.Mutex

alerts *store.Alerts
marker types.Marker

mtx sync.Mutex
listeners map[int]listeningAlerts
next int

Expand Down Expand Up @@ -100,35 +101,54 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, al
logger: log.With(l, "component", "provider"),
callback: alertCallback,
}
a.alerts.SetGCCallback(func(alerts []*types.Alert) {
for _, alert := range alerts {
if r != nil {
a.registerMetrics(r)
}

go a.gcLoop(ctx, intervalGC)

return a, nil
}

func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.gc()
}
}
}

func (a *Alerts) gc() {
a.mtx.Lock()
defer a.mtx.Unlock()

for _, alert := range a.alerts.List() {
if alert.Resolved() {
fp := alert.Fingerprint()
a.alerts.Delete(fp)

// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
m.Delete(alert.Fingerprint())
a.marker.Delete(fp)
a.callback.PostDelete(alert)
}
}

a.mtx.Lock()
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
a.mtx.Unlock()
})

if r != nil {
a.registerMetrics(r)
}

go a.alerts.Run(ctx, intervalGC)

return a, nil
}

// Close the alert provider.
Expand Down Expand Up @@ -178,8 +198,11 @@ func (a *Alerts) GetPending() provider.AlertIterator {

go func() {
defer close(ch)
a.mtx.Lock()
alerts := a.alerts.List()
a.mtx.Unlock()

for _, a := range a.alerts.List() {
for _, a := range alerts {
select {
case ch <- a:
case <-done:
Expand All @@ -193,6 +216,8 @@ func (a *Alerts) GetPending() provider.AlertIterator {

// Get returns the alert for a given fingerprint.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.alerts.Get(fp)
}

Expand All @@ -201,6 +226,7 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
for _, alert := range alerts {
fp := alert.Fingerprint()

a.mtx.Lock()
existing := false

// Check that there's an alert existing within the store before
Expand All @@ -216,18 +242,19 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
}

if err := a.callback.PreStore(alert, existing); err != nil {
a.mtx.Unlock()
level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err)
continue
}

if err := a.alerts.Set(alert); err != nil {
a.mtx.Unlock()
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
continue
}

a.callback.PostStore(alert, existing)

a.mtx.Lock()
for _, l := range a.listeners {
select {
case l.alerts <- alert:
Expand All @@ -242,6 +269,9 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {

// count returns the number of non-resolved alerts we currently have stored filtered by the provided state.
func (a *Alerts) count(state types.AlertState) int {
a.mtx.Lock()
defer a.mtx.Unlock()

var count int
for _, alert := range a.alerts.List() {
if alert.Resolved() {
Expand Down
60 changes: 60 additions & 0 deletions provider/mem/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package mem

import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -504,3 +505,62 @@ func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) {
func (l *limitCountCallback) PostDelete(_ *types.Alert) {
l.alerts.Dec()
}

func TestAlertsConcurrently(t *testing.T) {
callback := &limitCountCallback{limit: 100}
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, log.NewNopLogger(), nil)
require.NoError(t, err)

stopc := make(chan struct{})
failc := make(chan struct{})
go func() {
time.Sleep(2 * time.Second)
close(stopc)
}()
expire := 10 * time.Millisecond
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()

j := 0
for {
select {
case <-failc:
return
case <-stopc:
return
default:
}
now := time.Now()
err := a.Put(&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"bar": model.LabelValue(strconv.Itoa(j))},
StartsAt: now,
EndsAt: now.Add(expire),
},
UpdatedAt: now,
})
if err != nil && !errors.Is(err, errTooManyAlerts) {
close(failc)
return
}
j++
}
}()
}
wg.Wait()
select {
case <-failc:
t.Fatalf("unexpected error happened")
default:
}

time.Sleep(expire)
require.Eventually(t, func() bool {
// When the alert will eventually expire and is considered resolved - it won't count.
return a.count(types.AlertStateActive) == 0
}, 2*expire, expire)
require.Equal(t, int32(0), callback.alerts.Load())
}

0 comments on commit 2c6748d

Please sign in to comment.