Skip to content

Commit

Permalink
Improve observability of query-scheduler to querier communication (#9519
Browse files Browse the repository at this point in the history
)

* Include querier ID in errors returned when sending a query to a querier fails

* Add trace span for sending the query to the querier

* Add changelog entry

* Fix lint
  • Loading branch information
charleskorn authored Oct 6, 2024
1 parent 29160c8 commit da1051f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
* [ENHANCEMENT] Ruler: Support `exclude_alerts` parameter in `<prometheus-http-prefix>/api/v1/rules` endpoint. #9300
* [ENHANCEMENT] Distributor: add a metric to track tenants who are sending newlines in their label values called `cortex_distributor_label_values_with_newlines_total`. #9400
* [ENHANCEMENT] Ingester: improve performance of reading the WAL. #9508
* [ENHANCEMENT] Compactor: uploaded blocks cannot be bigger than max configured compactor time range, and cannot cross the boundary for given time range.
* [ENHANCEMENT] Query-scheduler: improve the errors and traces emitted by query-schedulers when communicating with queriers. #9519
* [ENHANCEMENT] Compactor: uploaded blocks cannot be bigger than max configured compactor time range, and cannot cross the boundary for given time range. #9524
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
25 changes: 21 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package scheduler
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -26,6 +27,8 @@ import (
"github.com/grafana/dskit/user"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -471,7 +474,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
continue
}

if err := s.forwardRequestToQuerier(querier, schedulerReq, queueTime); err != nil {
if err := s.forwardRequestToQuerier(querier, querierID, schedulerReq, queueTime); err != nil {
return err
}
}
Expand All @@ -487,7 +490,7 @@ func (s *Scheduler) NotifyQuerierShutdown(ctx context.Context, req *schedulerpb.
return &schedulerpb.NotifyQuerierShutdownResponse{}, nil
}

func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *queue.SchedulerRequest, queueTime time.Duration) error {
func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, querierID string, req *queue.SchedulerRequest, queueTime time.Duration) error {
s.requestQueue.QueryComponentUtilization.MarkRequestSent(req)
defer s.requestQueue.QueryComponentUtilization.MarkRequestCompleted(req)
defer s.cancelRequestAndRemoveFromPending(req.Key(), "request complete")
Expand All @@ -496,6 +499,9 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer
// monitor the contexts in a select and cancel things appropriately.
errCh := make(chan error, 1)
go func() {
span, _ := opentracing.StartSpanFromContext(req.Ctx, "forwardRequestToQuerier")
span.SetTag("querier_id", querierID)

err := querier.Send(&schedulerpb.SchedulerToQuerier{
UserID: req.UserID,
QueryID: req.QueryID,
Expand All @@ -504,13 +510,24 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer
StatsEnabled: req.StatsEnabled,
QueueTimeNanos: queueTime.Nanoseconds(),
})

if err != nil {
errCh <- err
errCh <- fmt.Errorf("failed to send query to querier '%v': %w", querierID, err)
span.LogFields(otlog.Message("sending query to querier failed"), otlog.Error(err))
ext.Error.Set(span, true)
span.Finish()
return
}

span.Finish()

_, err = querier.Recv()
errCh <- err
if err != nil {
errCh <- fmt.Errorf("failed to receive response from querier '%v': %w", querierID, err)
return
}

errCh <- nil
}()

select {
Expand Down

0 comments on commit da1051f

Please sign in to comment.