From 5f476a39e130a22dc5b641af84a0c902f83a4ad3 Mon Sep 17 00:00:00 2001 From: jackyin Date: Thu, 9 Jan 2025 18:25:03 +0800 Subject: [PATCH] fix: Fix goroutine leak in queryrange downstreamer (#15665) --- pkg/querier/queryrange/downstreamer.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index c6fba0fbf49a1..b2aa50c292154 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -170,10 +170,12 @@ func (in instance) For( go func() { err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error { res, err := fn(queries[i]) + if err != nil { + return err + } response := logql.Resp{ I: i, Res: res, - Err: err, } // Feed the result into the channel unless the work has completed. @@ -181,7 +183,7 @@ func (in instance) For( case <-ctx.Done(): case ch <- response: } - return err + return nil }) if err != nil { ch <- logql.Resp{ @@ -192,15 +194,19 @@ func (in instance) For( close(ch) }() + var err error for resp := range ch { - if resp.Err != nil { - return nil, resp.Err + if err != nil { + continue } - if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil { - return nil, err + if resp.Err != nil { + err = resp.Err + continue } + err = acc.Accumulate(ctx, resp.Res, resp.I) } - return acc.Result(), nil + + return acc.Result(), err } // convert to matrix