From 266be74db299574ce1afad6096ada3ab4d7b2d03 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 13 Dec 2024 16:33:27 +0200 Subject: [PATCH 1/2] fix: data race in pattern aggregation push --- pkg/pattern/aggregation/push.go | 14 +++++++++----- pkg/pattern/aggregation/push_test.go | 1 + 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index dd35815861d1c..8f1975223cd42 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -59,8 +59,9 @@ type Push struct { contentType string logger log.Logger - // shutdown channels - quit chan struct{} + running sync.WaitGroup + quit chan struct{} + quitOnce sync.Once // auth username, password string @@ -149,6 +150,7 @@ func NewPush( metrics: metrics, } + p.running.Add(1) go p.run(pushPeriod) return p, nil } @@ -160,10 +162,10 @@ func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) { // Stop will cancel any ongoing requests and stop the goroutine listening for requests func (p *Push) Stop() { - if p.quit != nil { + p.quitOnce.Do(func() { close(p.quit) - p.quit = nil - } + }) + p.running.Wait() } // buildPayload creates the snappy compressed protobuf to send to Loki @@ -244,6 +246,8 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) { // run pulls lines out of the channel and sends them to Loki func (p *Push) run(pushPeriod time.Duration) { + p.running.Done() + ctx, cancel := context.WithCancel(context.Background()) pushTicker := time.NewTimer(pushPeriod) defer pushTicker.Stop() diff --git a/pkg/pattern/aggregation/push_test.go b/pkg/pattern/aggregation/push_test.go index bc7903fe29a99..ecab42c08ba55 100644 --- a/pkg/pattern/aggregation/push_test.go +++ b/pkg/pattern/aggregation/push_test.go @@ -169,6 +169,7 @@ func Test_Push(t *testing.T) { lbls2, ) + p.running.Add(1) go p.run(time.Nanosecond) select { From 42dcaba4af3bf4f28d9b5b3ead33f05c4e1f82e3 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 13 Dec 2024 17:34:38 +0200 Subject: [PATCH 2/2] fixup: add a missing defer --- pkg/pattern/aggregation/push.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index 8f1975223cd42..7b7a7777ab28b 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -246,7 +246,7 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) { // run pulls lines out of the channel and sends them to Loki func (p *Push) run(pushPeriod time.Duration) { - p.running.Done() + defer p.running.Done() ctx, cancel := context.WithCancel(context.Background()) pushTicker := time.NewTimer(pushPeriod)