Skip to content

Commit

Permalink
Frontend cleanup and perf improvement (#3996)
Browse files Browse the repository at this point in the history
* clean up queues on a timer

Signed-off-by: Joe Elliott <[email protected]>

* cleanup queues

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* review cleanup

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Aug 23, 2024
1 parent 8a0b0e7 commit 3414179
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 341 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

* [ENHANCEMENT] TraceQL: Attribute iterators collect matched array values [#3867](https://github.com/grafana/tempo/pull/3867) (@electron0zero, @stoewer)
* [ENHANCEMENT] Add bytes and spans received to usage stats [#3983](https://github.com/grafana/tempo/pull/3983) (@joe-elliott)
* [ENHANCEMENT] Prevent massive allocations in the frontend if there is not sufficient pressure from the query pipeline. [#3996](https://github.com/grafana/tempo/pull/3996) (@joe-elliott)
**BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing.

# v2.6.0-rc.0

Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (t *App) initQuerier() (services.Service, error) {
func (t *App) initQueryFrontend() (services.Service, error) {
// cortexTripper is a bridge between http and httpgrpc.
// It does the job of passing data to the cortex frontend code.
cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ querier:
query_relevant_ingesters: false
query_frontend:
max_outstanding_per_tenant: 2000
querier_forget_delay: 0s
max_batch_size: 5
log_query_request_headers: ""
max_retries: 2
Expand Down
8 changes: 2 additions & 6 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,15 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {

type CortexNoQuerierLimits struct{}

var _ v1.Limits = (*CortexNoQuerierLimits)(nil)

func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 }

// InitFrontend initializes V1 frontend
//
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, limits, log, reg)
fr, err := v1.New(cfg, log, reg)
if err != nil {
return nil, nil, err
}
Expand Down
80 changes: 18 additions & 62 deletions modules/frontend/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (

"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

const (
// How frequently to check for disconnected queriers that should be forgotten.
forgetCheckPeriod = 5 * time.Second
queueCleanupPeriod = 30 * time.Second // every 30 seconds b/c the the stopping code requires there to be no queues. i would like to make this 5-10 minutes but then shutdowns would be blocked
)

var (
Expand Down Expand Up @@ -43,14 +41,10 @@ func FirstUser() UserIndex {
// Request stored into the queue.
type Request interface{}

// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers,
// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests
// in a fair fashion.
// RequestQueue holds incoming requests in per-user queues.
type RequestQueue struct {
services.Service

connectedQuerierWorkers *atomic.Int32

mtx sync.RWMutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
Expand All @@ -60,26 +54,23 @@ type RequestQueue struct {
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
queues: newUserQueues(maxOutstandingPerTenant),
queueLength: queueLength,
discardedRequests: discardedRequests,
}

q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue")
q.Service = services.NewTimerService(queueCleanupPeriod, nil, q.cleanupQueues, q.stopping).WithName("request queue")

return q
}

// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can
// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change
// between calls.
// EnqueueRequest puts the request into the queue.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int) error {
func (q *RequestQueue) EnqueueRequest(userID string, req Request) error {
q.mtx.RLock()
// don't defer a release. we won't know what we need to release until we call getQueueUnderRlock

Expand All @@ -89,7 +80,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}

// try to grab the user queue under read lock
queue, cleanup, err := q.getQueueUnderRlock(userID, maxQueriers)
queue, cleanup, err := q.getQueueUnderRlock(userID)
defer cleanup()
if err != nil {
return err
Expand All @@ -109,7 +100,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
// getQueueUnderRlock attempts to get the queue for the given user under read lock. if it is not
// possible it upgrades the RLock to a Lock. This method also returns a cleanup function that
// will release whichever lock it had to acquire to get the queue.
func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan Request, func(), error) {
func (q *RequestQueue) getQueueUnderRlock(userID string) (chan Request, func(), error) {
cleanup := func() {
q.mtx.RUnlock()
}
Expand All @@ -128,7 +119,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan
q.mtx.Unlock()
}

queue := q.queues.getOrAddQueue(userID, maxQueriers)
queue := q.queues.getOrAddQueue(userID)
if queue == nil {
// This can only happen if userID is "".
return nil, cleanup, errors.New("no queue found")
Expand All @@ -139,7 +130,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan

// GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of
// batchBuffer. This slice is a reusable buffer to fill up with requests
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string, batchBuffer []Request) ([]Request, UserIndex, error) {
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, batchBuffer []Request) ([]Request, UserIndex, error) {
requestedCount := len(batchBuffer)
if requestedCount == 0 {
return nil, last, errors.New("batch buffer must have len > 0")
Expand All @@ -165,7 +156,7 @@ FindQueue:
return nil, last, err
}

queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID)
queue, userID, idx := q.queues.getNextQueueForQuerier(last.last)
last.last = idx
if queue != nil {
// this is all threadsafe b/c all users queues are blocked by q.mtx
Expand All @@ -179,14 +170,7 @@ FindQueue:
batchBuffer[i] = <-queue
}

qLen := len(queue)
if qLen == 0 {
q.queues.deleteQueue(userID)
}
q.queueLength.WithLabelValues(userID).Set(float64(qLen))

// Tell close() we've processed a request.
q.cond.Broadcast()
q.queueLength.WithLabelValues(userID).Set(float64(len(queue)))

return batchBuffer, last, nil
}
Expand All @@ -197,13 +181,11 @@ FindQueue:
goto FindQueue
}

func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
func (q *RequestQueue) cleanupQueues(_ context.Context) error {
q.mtx.Lock()
defer q.mtx.Unlock()

if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// We need to notify goroutines cause having removed some queriers
// may have caused a resharding.
if q.queues.deleteEmptyQueues() {
q.cond.Broadcast()
}

Expand All @@ -214,7 +196,7 @@ func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()

for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 {
for q.queues.len() > 0 {
q.cond.Wait(context.Background())
}

Expand All @@ -227,32 +209,6 @@ func (q *RequestQueue) stopping(_ error) error {
return nil
}

func (q *RequestQueue) RegisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Inc()

q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.addQuerierConnection(querier)
}

func (q *RequestQueue) UnregisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Dec()

q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.removeQuerierConnection(querier, time.Now())
}

func (q *RequestQueue) NotifyQuerierShutdown(querierID string) {
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.notifyQuerierShutdown(querierID)
}

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}

// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
type contextCond struct {
*sync.Cond
Expand Down
59 changes: 11 additions & 48 deletions modules/frontend/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGetNextForQuerierOneUser(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest("test", &mockRequest{}, 0)
err := q.EnqueueRequest("test", &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func TestGetNextForQuerierRandomUsers(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest(test.RandomString(), &mockRequest{}, 0)
err := q.EnqueueRequest(test.RandomString(), &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestGetNextBatches(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest("user", &mockRequest{}, 0)
err := q.EnqueueRequest("user", &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func benchmarkGetNextForQuerier(b *testing.B, listeners int, messages int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < messages; j++ {
err := q.EnqueueRequest(user, req, 0)
err := q.EnqueueRequest(user, req)
if err != nil {
panic(err)
}
Expand All @@ -160,7 +160,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste
Name: "test_discarded",
}, []string{"user"})

q := NewRequestQueue(100_000, 0, g, c)
q := NewRequestQueue(100_000, g, c)
start := make(chan struct{})

for i := 0; i < listeners; i++ {
Expand All @@ -173,7 +173,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste

batchBuffer := make([]Request, batchSize)
for {
r, last, err = q.GetNextRequestForQuerier(ctx, last, "", batchBuffer)
r, last, err = q.GetNextRequestForQuerier(ctx, last, batchBuffer)
if err != nil {
return
}
Expand All @@ -184,49 +184,12 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste
}()
}

return q, start
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second

queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))

// Start the queue service.
ctx := context.Background()
require.NoError(t, services.StartAndAwaitRunning(ctx, queue))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, queue))
})

// Two queriers connect.
queue.RegisterQuerierConnection("querier-1")
queue.RegisterQuerierConnection("querier-2")

// Querier-2 waits for a new request.
querier2wg := sync.WaitGroup{}
querier2wg.Add(1)
go func() {
defer querier2wg.Done()
_, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2", make([]Request, 1))
require.NoError(t, err)
}()

// Querier-1 crashes (no graceful shutdown notification).
queue.UnregisterQuerierConnection("querier-1")

// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
require.NoError(t, queue.EnqueueRequest("user-1", "request", 1))

startTime := time.Now()
querier2wg.Wait()
waitTime := time.Since(startTime)
err := services.StartAndAwaitRunning(context.Background(), q)
if err != nil {
panic(err)
}

// We expect that querier-2 got the request only after querier-1 forget delay is passed.
assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds())
return q, start
}

func TestContextCond(t *testing.T) {
Expand Down
Loading

0 comments on commit 3414179

Please sign in to comment.