diff --git a/CHANGELOG.md b/CHANGELOG.md index e715d2824ed..2111c23efc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ * [ENHANCEMENT] TraceQL: Attribute iterators collect matched array values [#3867](https://github.com/grafana/tempo/pull/3867) (@electron0zero, @stoewer) * [ENHANCEMENT] Add bytes and spans received to usage stats [#3983](https://github.com/grafana/tempo/pull/3983) (@joe-elliott) +* [ENHANCEMENT] Prevent massive allocations in the frontend if there is not sufficient pressure from the query pipeline. [#3996](https://github.com/grafana/tempo/pull/3996) (@joe-elliott) + **BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing. # v2.6.0-rc.0 diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 726ef4a94ff..d81d3ec69a6 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -364,7 +364,7 @@ func (t *App) initQuerier() (services.Service, error) { func (t *App) initQueryFrontend() (services.Service, error) { // cortexTripper is a bridge between http and httpgrpc. // It does the job of passing data to the cortex frontend code. - cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer) + cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index 85db46fad52..07f78ee1cab 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -294,7 +294,6 @@ querier: query_relevant_ingesters: false query_frontend: max_outstanding_per_tenant: 2000 - querier_forget_delay: 0s max_batch_size: 5 log_query_request_headers: "" max_retries: 2 diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 84e96a33b0d..8650440d632 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -103,19 +103,15 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { type CortexNoQuerierLimits struct{} -var _ v1.Limits = (*CortexNoQuerierLimits)(nil) - -func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 } - // InitFrontend initializes V1 frontend // // Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered // into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil // (if there are no errors), and it uses the returned frontend (if any). -func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) { +func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) { statVersion.Set("v1") // No scheduler = use original frontend. - fr, err := v1.New(cfg, limits, log, reg) + fr, err := v1.New(cfg, log, reg) if err != nil { return nil, nil, err } diff --git a/modules/frontend/queue/queue.go b/modules/frontend/queue/queue.go index 4b3add53ede..75ef6db0bfc 100644 --- a/modules/frontend/queue/queue.go +++ b/modules/frontend/queue/queue.go @@ -8,12 +8,10 @@ import ( "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" ) const ( - // How frequently to check for disconnected queriers that should be forgotten. - forgetCheckPeriod = 5 * time.Second + queueCleanupPeriod = 30 * time.Second // every 30 seconds b/c the the stopping code requires there to be no queues. i would like to make this 5-10 minutes but then shutdowns would be blocked ) var ( @@ -43,14 +41,10 @@ func FirstUser() UserIndex { // Request stored into the queue. type Request interface{} -// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers, -// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests -// in a fair fashion. +// RequestQueue holds incoming requests in per-user queues. type RequestQueue struct { services.Service - connectedQuerierWorkers *atomic.Int32 - mtx sync.RWMutex cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected. queues *queues @@ -60,26 +54,23 @@ type RequestQueue struct { discardedRequests *prometheus.CounterVec // Per user. } -func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue { +func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue { q := &RequestQueue{ - queues: newUserQueues(maxOutstandingPerTenant, forgetDelay), - connectedQuerierWorkers: atomic.NewInt32(0), - queueLength: queueLength, - discardedRequests: discardedRequests, + queues: newUserQueues(maxOutstandingPerTenant), + queueLength: queueLength, + discardedRequests: discardedRequests, } q.cond = contextCond{Cond: sync.NewCond(&q.mtx)} - q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue") + q.Service = services.NewTimerService(queueCleanupPeriod, nil, q.cleanupQueues, q.stopping).WithName("request queue") return q } -// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can -// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change -// between calls. +// EnqueueRequest puts the request into the queue. // // If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request. -func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int) error { +func (q *RequestQueue) EnqueueRequest(userID string, req Request) error { q.mtx.RLock() // don't defer a release. we won't know what we need to release until we call getQueueUnderRlock @@ -89,7 +80,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in } // try to grab the user queue under read lock - queue, cleanup, err := q.getQueueUnderRlock(userID, maxQueriers) + queue, cleanup, err := q.getQueueUnderRlock(userID) defer cleanup() if err != nil { return err @@ -109,7 +100,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in // getQueueUnderRlock attempts to get the queue for the given user under read lock. if it is not // possible it upgrades the RLock to a Lock. This method also returns a cleanup function that // will release whichever lock it had to acquire to get the queue. -func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan Request, func(), error) { +func (q *RequestQueue) getQueueUnderRlock(userID string) (chan Request, func(), error) { cleanup := func() { q.mtx.RUnlock() } @@ -128,7 +119,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan q.mtx.Unlock() } - queue := q.queues.getOrAddQueue(userID, maxQueriers) + queue := q.queues.getOrAddQueue(userID) if queue == nil { // This can only happen if userID is "". return nil, cleanup, errors.New("no queue found") @@ -139,7 +130,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan // GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of // batchBuffer. This slice is a reusable buffer to fill up with requests -func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string, batchBuffer []Request) ([]Request, UserIndex, error) { +func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, batchBuffer []Request) ([]Request, UserIndex, error) { requestedCount := len(batchBuffer) if requestedCount == 0 { return nil, last, errors.New("batch buffer must have len > 0") @@ -165,7 +156,7 @@ FindQueue: return nil, last, err } - queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID) + queue, userID, idx := q.queues.getNextQueueForQuerier(last.last) last.last = idx if queue != nil { // this is all threadsafe b/c all users queues are blocked by q.mtx @@ -179,14 +170,7 @@ FindQueue: batchBuffer[i] = <-queue } - qLen := len(queue) - if qLen == 0 { - q.queues.deleteQueue(userID) - } - q.queueLength.WithLabelValues(userID).Set(float64(qLen)) - - // Tell close() we've processed a request. - q.cond.Broadcast() + q.queueLength.WithLabelValues(userID).Set(float64(len(queue))) return batchBuffer, last, nil } @@ -197,13 +181,11 @@ FindQueue: goto FindQueue } -func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error { +func (q *RequestQueue) cleanupQueues(_ context.Context) error { q.mtx.Lock() defer q.mtx.Unlock() - if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 { - // We need to notify goroutines cause having removed some queriers - // may have caused a resharding. + if q.queues.deleteEmptyQueues() { q.cond.Broadcast() } @@ -214,7 +196,7 @@ func (q *RequestQueue) stopping(_ error) error { q.mtx.Lock() defer q.mtx.Unlock() - for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 { + for q.queues.len() > 0 { q.cond.Wait(context.Background()) } @@ -227,32 +209,6 @@ func (q *RequestQueue) stopping(_ error) error { return nil } -func (q *RequestQueue) RegisterQuerierConnection(querier string) { - q.connectedQuerierWorkers.Inc() - - q.mtx.Lock() - defer q.mtx.Unlock() - q.queues.addQuerierConnection(querier) -} - -func (q *RequestQueue) UnregisterQuerierConnection(querier string) { - q.connectedQuerierWorkers.Dec() - - q.mtx.Lock() - defer q.mtx.Unlock() - q.queues.removeQuerierConnection(querier, time.Now()) -} - -func (q *RequestQueue) NotifyQuerierShutdown(querierID string) { - q.mtx.Lock() - defer q.mtx.Unlock() - q.queues.notifyQuerierShutdown(querierID) -} - -func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { - return float64(q.connectedQuerierWorkers.Load()) -} - // contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting. type contextCond struct { *sync.Cond diff --git a/modules/frontend/queue/queue_test.go b/modules/frontend/queue/queue_test.go index a17307dd129..80d2ea2367d 100644 --- a/modules/frontend/queue/queue_test.go +++ b/modules/frontend/queue/queue_test.go @@ -37,7 +37,7 @@ func TestGetNextForQuerierOneUser(t *testing.T) { close(start) for j := 0; j < messages; j++ { - err := q.EnqueueRequest("test", &mockRequest{}, 0) + err := q.EnqueueRequest("test", &mockRequest{}) require.NoError(t, err) } @@ -65,7 +65,7 @@ func TestGetNextForQuerierRandomUsers(t *testing.T) { close(start) for j := 0; j < messages; j++ { - err := q.EnqueueRequest(test.RandomString(), &mockRequest{}, 0) + err := q.EnqueueRequest(test.RandomString(), &mockRequest{}) require.NoError(t, err) } @@ -93,7 +93,7 @@ func TestGetNextBatches(t *testing.T) { close(start) for j := 0; j < messages; j++ { - err := q.EnqueueRequest("user", &mockRequest{}, 0) + err := q.EnqueueRequest("user", &mockRequest{}) require.NoError(t, err) } @@ -136,7 +136,7 @@ func benchmarkGetNextForQuerier(b *testing.B, listeners int, messages int) { b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < messages; j++ { - err := q.EnqueueRequest(user, req, 0) + err := q.EnqueueRequest(user, req) if err != nil { panic(err) } @@ -160,7 +160,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste Name: "test_discarded", }, []string{"user"}) - q := NewRequestQueue(100_000, 0, g, c) + q := NewRequestQueue(100_000, g, c) start := make(chan struct{}) for i := 0; i < listeners; i++ { @@ -173,7 +173,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste batchBuffer := make([]Request, batchSize) for { - r, last, err = q.GetNextRequestForQuerier(ctx, last, "", batchBuffer) + r, last, err = q.GetNextRequestForQuerier(ctx, last, batchBuffer) if err != nil { return } @@ -184,49 +184,12 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste }() } - return q, start -} - -func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) { - const forgetDelay = 3 * time.Second - - queue := NewRequestQueue(1, forgetDelay, - prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"})) - - // Start the queue service. - ctx := context.Background() - require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) - }) - - // Two queriers connect. - queue.RegisterQuerierConnection("querier-1") - queue.RegisterQuerierConnection("querier-2") - - // Querier-2 waits for a new request. - querier2wg := sync.WaitGroup{} - querier2wg.Add(1) - go func() { - defer querier2wg.Done() - _, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2", make([]Request, 1)) - require.NoError(t, err) - }() - - // Querier-1 crashes (no graceful shutdown notification). - queue.UnregisterQuerierConnection("querier-1") - - // Enqueue a request from an user which would be assigned to querier-1. - // NOTE: "user-1" hash falls in the querier-1 shard. - require.NoError(t, queue.EnqueueRequest("user-1", "request", 1)) - - startTime := time.Now() - querier2wg.Wait() - waitTime := time.Since(startTime) + err := services.StartAndAwaitRunning(context.Background(), q) + if err != nil { + panic(err) + } - // We expect that querier-2 got the request only after querier-1 forget delay is passed. - assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) + return q, start } func TestContextCond(t *testing.T) { diff --git a/modules/frontend/queue/user_queues.go b/modules/frontend/queue/user_queues.go index 2bff741c03e..cc615d9fd94 100644 --- a/modules/frontend/queue/user_queues.go +++ b/modules/frontend/queue/user_queues.go @@ -1,25 +1,5 @@ package queue -import ( - "math/rand" - "sort" - "time" - - "github.com/grafana/dskit/ring/shard" -) - -// querier holds information about a querier registered in the queue. -type querier struct { - // Number of active connections. - connections int - - // True if the querier notified it's gracefully shutting down. - shuttingDown bool - - // When the last connection has been unregistered. - disconnectedAt time.Time -} - // This struct holds user queues for pending requests. It also keeps track of connected queriers, // and mapping between users and queriers. type queues struct { @@ -31,42 +11,20 @@ type queues struct { users []string maxUserQueueSize int - - // How long to wait before removing a querier which has got disconnected - // but hasn't notified about a graceful shutdown. - forgetDelay time.Duration - - // Tracks queriers registered to the queue. - queriers map[string]*querier - - // Sorted list of querier names, used when creating per-user shard. - sortedQueriers []string } type userQueue struct { ch chan Request - // If not nil, only these queriers can handle user requests. If nil, all queriers can. - // We set this to nil if number of available queriers <= maxQueriers. - queriers map[string]struct{} - maxQueriers int - - // Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent - // between different frontends. - seed int64 - // Points back to 'users' field in queues. Enables quick cleanup. index int } -func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues { +func newUserQueues(maxUserQueueSize int) *queues { return &queues{ userQueues: map[string]*userQueue{}, users: nil, maxUserQueueSize: maxUserQueueSize, - forgetDelay: forgetDelay, - queriers: map[string]*querier{}, - sortedQueriers: nil, } } @@ -89,26 +47,34 @@ func (q *queues) deleteQueue(userID string) { } } +// deleteEmptyQueues removes all user queues that have no length. in addition it returns true if any +// queues were removed. +func (q *queues) deleteEmptyQueues() bool { + removedQueue := false + + // look for 0 len queues and remove them + for userID, uq := range q.userQueues { + if len(uq.ch) == 0 { + removedQueue = true + q.deleteQueue(userID) + } + } + + return removedQueue +} + // Returns existing or new queue for user. -// MaxQueriers is used to compute which queriers should handle requests for this user. -// If maxQueriers is <= 0, all queriers can handle this user's requests. -// If maxQueriers has changed since the last call, queriers for this are recomputed. -func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { +func (q *queues) getOrAddQueue(userID string) chan Request { // Empty user is not allowed, as that would break our users list ("" is used for free spot). if userID == "" { return nil } - if maxQueriers < 0 { - maxQueriers = 0 - } - uq := q.userQueues[userID] if uq == nil { uq = &userQueue{ ch: make(chan Request, q.maxUserQueueSize), - seed: shard.ShuffleShardSeed(userID, ""), index: -1, } q.userQueues[userID] = uq @@ -129,18 +95,13 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { } } - if uq.maxQueriers != maxQueriers { - uq.maxQueriers = maxQueriers - uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil) - } - return uq.ch } // Finds next queue for the querier. To support fair scheduling between users, client is expected // to pass last user index returned by this function as argument. Is there was no previous // last user index, use -1. -func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (chan Request, string, int) { +func (q *queues) getNextQueueForQuerier(lastUserIndex int) (chan Request, string, int) { uid := lastUserIndex for iters := 0; iters < len(q.users); iters++ { @@ -159,147 +120,11 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (ch q := q.userQueues[u] - if q.queriers != nil { - if _, ok := q.queriers[querierID]; !ok { - // This querier is not handling the user. - continue - } + if len(q.ch) == 0 { + continue } return q.ch, u, uid } return nil, "", uid } - -func (q *queues) addQuerierConnection(querierID string) { - info := q.queriers[querierID] - if info != nil { - info.connections++ - - // Reset in case the querier re-connected while it was in the forget waiting period. - info.shuttingDown = false - info.disconnectedAt = time.Time{} - - return - } - - // First connection from this querier. - q.queriers[querierID] = &querier{connections: 1} - q.sortedQueriers = append(q.sortedQueriers, querierID) - sort.Strings(q.sortedQueriers) - - q.recomputeUserQueriers() -} - -func (q *queues) removeQuerierConnection(querierID string, now time.Time) { - info := q.queriers[querierID] - if info == nil || info.connections <= 0 { - panic("unexpected number of connections for querier") - } - - // Decrease the number of active connections. - info.connections-- - if info.connections > 0 { - return - } - - // There no more active connections. If the forget delay is configured then - // we can remove it only if querier has announced a graceful shutdown. - if info.shuttingDown || q.forgetDelay == 0 { - q.removeQuerier(querierID) - return - } - - // No graceful shutdown has been notified yet, so we should track the current time - // so that we'll remove the querier as soon as we receive the graceful shutdown - // notification (if any) or once the threshold expires. - info.disconnectedAt = now -} - -func (q *queues) removeQuerier(querierID string) { - delete(q.queriers, querierID) - - ix := sort.SearchStrings(q.sortedQueriers, querierID) - if ix >= len(q.sortedQueriers) || q.sortedQueriers[ix] != querierID { - panic("incorrect state of sorted queriers") - } - - q.sortedQueriers = append(q.sortedQueriers[:ix], q.sortedQueriers[ix+1:]...) - - q.recomputeUserQueriers() -} - -// notifyQuerierShutdown records that a querier has sent notification about a graceful shutdown. -func (q *queues) notifyQuerierShutdown(querierID string) { - info := q.queriers[querierID] - if info == nil { - // The querier may have already been removed, so we just ignore it. - return - } - - // If there are no more connections, we should remove the querier. - if info.connections == 0 { - q.removeQuerier(querierID) - return - } - - // Otherwise we should annotate we received a graceful shutdown notification - // and the querier will be removed once all connections are unregistered. - info.shuttingDown = true -} - -// forgetDisconnectedQueriers removes all disconnected queriers that have gone since at least -// the forget delay. Returns the number of forgotten queriers. -func (q *queues) forgetDisconnectedQueriers(now time.Time) int { - // Nothing to do if the forget delay is disabled. - if q.forgetDelay == 0 { - return 0 - } - - // Remove all queriers with no connections that have gone since at least the forget delay. - threshold := now.Add(-q.forgetDelay) - forgotten := 0 - - for querierID := range q.queriers { - if info := q.queriers[querierID]; info.connections == 0 && info.disconnectedAt.Before(threshold) { - q.removeQuerier(querierID) - forgotten++ - } - } - - return forgotten -} - -func (q *queues) recomputeUserQueriers() { - scratchpad := make([]string, 0, len(q.sortedQueriers)) - - for _, uq := range q.userQueues { - uq.queriers = shuffleQueriersForUser(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad) - } -} - -// shuffleQueriersForUser returns nil if queriersToSelect is 0 or there are not enough queriers to select from. -// In that case *all* queriers should be used. -// Scratchpad is used for shuffling, to avoid new allocations. If nil, new slice is allocated. -func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} { - if queriersToSelect == 0 || len(allSortedQueriers) <= queriersToSelect { - return nil - } - - result := make(map[string]struct{}, queriersToSelect) - rnd := rand.New(rand.NewSource(userSeed)) - - scratchpad = scratchpad[:0] - scratchpad = append(scratchpad, allSortedQueriers...) - - last := len(scratchpad) - 1 - for i := 0; i < queriersToSelect; i++ { - r := rnd.Intn(last + 1) - result[scratchpad[r]] = struct{}{} - // move selected item to the end, it won't be selected anymore. - scratchpad[r], scratchpad[last] = scratchpad[last], scratchpad[r] - last-- - } - - return result -} diff --git a/modules/frontend/v1/frontend.go b/modules/frontend/v1/frontend.go index 6ccfad6275f..a451a269e02 100644 --- a/modules/frontend/v1/frontend.go +++ b/modules/frontend/v1/frontend.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "sync/atomic" "time" "github.com/grafana/dskit/flagext" @@ -22,13 +23,11 @@ import ( "github.com/grafana/tempo/modules/frontend/queue" "github.com/grafana/tempo/modules/frontend/v1/frontendv1pb" "github.com/grafana/tempo/pkg/util" - "github.com/grafana/tempo/pkg/validation" ) // Config for a Frontend. type Config struct { MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"` - QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"` MaxBatchSize int `yaml:"max_batch_size"` LogQueryRequestHeaders flagext.StringSliceCSV `yaml:"log_query_request_headers"` } @@ -36,27 +35,22 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 2000, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") - f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") f.Var(&cfg.LogQueryRequestHeaders, "query-frontend.log-query-request-headers", "Comma-separated list of request header names to include in query logs. Applies to both query stats and slow queries logs.") } -type Limits interface { - // Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. - MaxQueriersPerUser(user string) int -} - // Frontend queues HTTP requests, dispatches them to backends, and handles retries // for requests which failed. type Frontend struct { services.Service - cfg Config - log log.Logger - limits Limits + cfg Config + log log.Logger requestQueue *queue.RequestQueue activeUsers *util.ActiveUsersCleanupService + connectedQuerierWorkers *atomic.Int32 + // Subservices manager. subservices *services.Manager subservicesWatcher *services.FailureWatcher @@ -80,7 +74,7 @@ type request struct { } // New creates a new frontend. Frontend implements service, and must be started and stopped. -func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) { +func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) { const batchBucketCount = 5 if cfg.MaxBatchSize <= 0 { return nil, errors.New("max_batch_size must be positive") @@ -88,9 +82,8 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist batchBucketSize := float64(cfg.MaxBatchSize) / float64(batchBucketCount) f := &Frontend{ - cfg: cfg, - log: log, - limits: limits, + cfg: cfg, + log: log, queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Name: "tempo_query_frontend_queue_length", Help: "Number of queries in the queue.", @@ -112,9 +105,10 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist Help: "Batch size.", Buckets: prometheus.LinearBuckets(1, batchBucketSize, batchBucketCount), }), + connectedQuerierWorkers: &atomic.Int32{}, } - f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests) + f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength, f.discardedRequests) f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics) var err error @@ -126,7 +120,9 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Name: "tempo_query_frontend_connected_clients", Help: "Number of worker clients currently connected to the frontend.", - }, f.requestQueue.GetConnectedQuerierWorkersMetric) + }, func() float64 { + return float64(f.connectedQuerierWorkers.Load()) + }) f.Service = services.NewBasicService(f.starting, f.running, f.stopping) return f, nil @@ -205,13 +201,13 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) // Process allows backends to pull requests from the frontend. func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { - querierID, querierFeatures, err := getQuerierInfo(server) + _, querierFeatures, err := getQuerierInfo(server) if err != nil { return err } - f.requestQueue.RegisterQuerierConnection(querierID) - defer f.requestQueue.UnregisterQuerierConnection(querierID) + f.connectedQuerierWorkers.Add(1) + defer f.connectedQuerierWorkers.Add(-1) lastUserIndex := queue.FirstUser() @@ -222,7 +218,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { } for { reqSlice := make([]queue.Request, batchSize) - reqSlice, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, querierID, reqSlice) + reqSlice, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, reqSlice) if err != nil { return err } @@ -331,7 +327,6 @@ func reportResponseUpstream(reqBatch *requestBatch, errs chan error, resps chan func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) { level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID()) - f.requestQueue.NotifyQuerierShutdown(req.GetClientID()) return &frontendv1pb.NotifyClientShutdownResponse{}, nil } @@ -371,20 +366,17 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error { req.enqueueTime = now req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued") - // aggregate the max queriers limit in the case of a multi tenant query - maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser) - joinedTenantID := tenant.JoinTenantIDs(tenantIDs) f.activeUsers.UpdateUserTimestamp(joinedTenantID, now) - return f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers) + return f.requestQueue.EnqueueRequest(joinedTenantID, req) } // CheckReady determines if the query frontend is ready. Function parameters/return // chosen to match the same method in the ingester func (f *Frontend) CheckReady(_ context.Context) error { // if we have more than one querier connected we will consider ourselves ready - connectedClients := f.requestQueue.GetConnectedQuerierWorkersMetric() + connectedClients := f.connectedQuerierWorkers.Load() if connectedClients > 0 { return nil }