diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index d00815e6252..73610f6e089 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -206,20 +206,6 @@ type ingesterQueryResult struct { chunkseriesBatches [][]ingester_client.TimeSeriesChunk timeseriesBatches [][]mimirpb.TimeSeries streamingSeries seriesChunksStream - - // Retain responses owning referenced gRPC buffers, until they are freed. - responses []*ingester_client.QueryStreamResponse -} - -func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) { - r.responses = append(r.responses, resp) -} - -func (r *ingesterQueryResult) freeBuffers() { - for _, resp := range r.responses { - resp.FreeBuffer() - } - r.responses = nil } // queryIngesterStream queries the ingesters using the gRPC streaming API. @@ -229,7 +215,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ // queryIngester MUST call cancelContext once processing is completed in order to release resources. It's required // by ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation() to properly release resources. - queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (result ingesterQueryResult, err error) { + queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (ingesterQueryResult, error) { log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.queryIngesterStream") cleanup := func() { log.Span.Finish() @@ -248,10 +234,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ cleanup() } - - if err != nil { - result.freeBuffers() - } }() log.Span.SetTag("ingester_address", ing.Addr) @@ -267,6 +249,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ return ingesterQueryResult{}, err } + result := ingesterQueryResult{} + // Why retain the batches rather than iteratively build a single slice? // If we iteratively build a single slice, we'll spend a lot of time copying elements as the slice grows beyond its capacity. // So instead, we build the slice in one go once we know how many series we have. @@ -274,8 +258,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ streamingSeriesCount := 0 for { - // XXX: Note that while we free responses' gRPC buffers on error, we don't do the same in case of success, - // as the combined response retains references to gRPC buffers. resp, err := stream.Recv() if errors.Is(err, io.EOF) { // We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here. @@ -284,8 +266,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ return ingesterQueryResult{}, err } - result.addResponse(resp) - if len(resp.Timeseries) > 0 { for _, series := range resp.Timeseries { if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {