diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 39505c456f5..e781ef93a7e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1832,11 +1832,20 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match res := newActiveSeriesResponse(d.hashCollisionCount, maxResponseSize) ingesterQuery := func(ctx context.Context, client ingester_client.IngesterClient) (any, error) { + // This function is invoked purely for its side effects on the captured + // activeSeriesResponse, its return value is never used. + type ignored struct{} + log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.ActiveSeries.queryIngester") defer log.Finish() stream, err := client.ActiveSeries(ctx, req) if err != nil { + if errors.Is(err, context.Canceled) { + return ignored{}, nil + } + level.Error(log).Log("msg", "error creating active series response stream", "err", err) + ext.Error.Set(log.Span, true) return nil, err } @@ -1849,9 +1858,13 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match for { msg, err := stream.Recv() - if errors.Is(err, io.EOF) { - break - } else if err != nil { + if err != nil { + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, context.Canceled) { + return ignored{}, nil + } level.Error(log).Log("msg", "error receiving active series response", "err", err) ext.Error.Set(log.Span, true) return nil, err @@ -1863,7 +1876,7 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match } } - return nil, nil + return ignored{}, nil } _, err = forReplicationSet(ctx, d, replicationSet, ingesterQuery)