Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frontend cleanup and perf improvement #3996

Merged
merged 5 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

* [ENHANCEMENT] TraceQL: Attribute iterators collect matched array values [#3867](https://github.com/grafana/tempo/pull/3867) (@electron0zero, @stoewer)
* [ENHANCEMENT] Add bytes and spans received to usage stats [#3983](https://github.com/grafana/tempo/pull/3983) (@joe-elliott)
* [ENHANCEMENT] Prevent massive allocations in the frontend if there is not sufficient pressure from the query pipeline. [#3996](https://github.com/grafana/tempo/pull/3996) (@joe-elliott)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
**BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing.

# v2.6.0-rc.0

Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (t *App) initQuerier() (services.Service, error) {
func (t *App) initQueryFrontend() (services.Service, error) {
// cortexTripper is a bridge between http and httpgrpc.
// It does the job of passing data to the cortex frontend code.
cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
cortexTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ querier:
query_relevant_ingesters: false
query_frontend:
max_outstanding_per_tenant: 2000
querier_forget_delay: 0s
max_batch_size: 5
log_query_request_headers: ""
max_retries: 2
Expand Down
8 changes: 2 additions & 6 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,15 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {

type CortexNoQuerierLimits struct{}

var _ v1.Limits = (*CortexNoQuerierLimits)(nil)

func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 }

// InitFrontend initializes V1 frontend
//
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, limits, log, reg)
fr, err := v1.New(cfg, log, reg)
if err != nil {
return nil, nil, err
}
Expand Down
80 changes: 18 additions & 62 deletions modules/frontend/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (

"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

const (
// How frequently to check for disconnected queriers that should be forgotten.
forgetCheckPeriod = 5 * time.Second
queueCleanupPeriod = 30 * time.Second // every 30 seconds b/c the the stopping code requires there to be no queues. i would like to make this 5-10 minutes but then shutdowns would be blocked
)

var (
Expand Down Expand Up @@ -43,14 +41,10 @@ func FirstUser() UserIndex {
// Request stored into the queue.
type Request interface{}

// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers,
// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests
// in a fair fashion.
// RequestQueue holds incoming requests in per-user queues.
type RequestQueue struct {
services.Service

connectedQuerierWorkers *atomic.Int32

mtx sync.RWMutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
Expand All @@ -60,26 +54,23 @@ type RequestQueue struct {
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
queues: newUserQueues(maxOutstandingPerTenant),
queueLength: queueLength,
discardedRequests: discardedRequests,
}

q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue")
q.Service = services.NewTimerService(queueCleanupPeriod, nil, q.cleanupQueues, q.stopping).WithName("request queue")

return q
}

// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can
// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change
// between calls.
// EnqueueRequest puts the request into the queue.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int) error {
func (q *RequestQueue) EnqueueRequest(userID string, req Request) error {
q.mtx.RLock()
// don't defer a release. we won't know what we need to release until we call getQueueUnderRlock

Expand All @@ -89,7 +80,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}

// try to grab the user queue under read lock
queue, cleanup, err := q.getQueueUnderRlock(userID, maxQueriers)
queue, cleanup, err := q.getQueueUnderRlock(userID)
defer cleanup()
if err != nil {
return err
Expand All @@ -109,7 +100,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
// getQueueUnderRlock attempts to get the queue for the given user under read lock. if it is not
// possible it upgrades the RLock to a Lock. This method also returns a cleanup function that
// will release whichever lock it had to acquire to get the queue.
func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan Request, func(), error) {
func (q *RequestQueue) getQueueUnderRlock(userID string) (chan Request, func(), error) {
cleanup := func() {
q.mtx.RUnlock()
}
Expand All @@ -128,7 +119,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan
q.mtx.Unlock()
}

queue := q.queues.getOrAddQueue(userID, maxQueriers)
queue := q.queues.getOrAddQueue(userID)
if queue == nil {
// This can only happen if userID is "".
return nil, cleanup, errors.New("no queue found")
Expand All @@ -139,7 +130,7 @@ func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan

// GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of
// batchBuffer. This slice is a reusable buffer to fill up with requests
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string, batchBuffer []Request) ([]Request, UserIndex, error) {
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, batchBuffer []Request) ([]Request, UserIndex, error) {
requestedCount := len(batchBuffer)
if requestedCount == 0 {
return nil, last, errors.New("batch buffer must have len > 0")
Expand All @@ -165,7 +156,7 @@ FindQueue:
return nil, last, err
}

queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID)
queue, userID, idx := q.queues.getNextQueueForQuerier(last.last)
last.last = idx
if queue != nil {
// this is all threadsafe b/c all users queues are blocked by q.mtx
Expand All @@ -179,14 +170,7 @@ FindQueue:
batchBuffer[i] = <-queue
}

qLen := len(queue)
if qLen == 0 {
q.queues.deleteQueue(userID)
}
q.queueLength.WithLabelValues(userID).Set(float64(qLen))

// Tell close() we've processed a request.
q.cond.Broadcast()
q.queueLength.WithLabelValues(userID).Set(float64(len(queue)))

return batchBuffer, last, nil
}
Expand All @@ -197,13 +181,11 @@ FindQueue:
goto FindQueue
}

func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
func (q *RequestQueue) cleanupQueues(_ context.Context) error {
q.mtx.Lock()
defer q.mtx.Unlock()

if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// We need to notify goroutines cause having removed some queriers
// may have caused a resharding.
if q.queues.deleteEmptyQueues() {
q.cond.Broadcast()
}

Expand All @@ -214,7 +196,7 @@ func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()

for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 {
for q.queues.len() > 0 {
q.cond.Wait(context.Background())
}

Expand All @@ -227,32 +209,6 @@ func (q *RequestQueue) stopping(_ error) error {
return nil
}

func (q *RequestQueue) RegisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Inc()

q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.addQuerierConnection(querier)
}

func (q *RequestQueue) UnregisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Dec()

q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.removeQuerierConnection(querier, time.Now())
}

func (q *RequestQueue) NotifyQuerierShutdown(querierID string) {
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.notifyQuerierShutdown(querierID)
}

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}

// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
type contextCond struct {
*sync.Cond
Expand Down
59 changes: 11 additions & 48 deletions modules/frontend/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGetNextForQuerierOneUser(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest("test", &mockRequest{}, 0)
err := q.EnqueueRequest("test", &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func TestGetNextForQuerierRandomUsers(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest(test.RandomString(), &mockRequest{}, 0)
err := q.EnqueueRequest(test.RandomString(), &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestGetNextBatches(t *testing.T) {
close(start)

for j := 0; j < messages; j++ {
err := q.EnqueueRequest("user", &mockRequest{}, 0)
err := q.EnqueueRequest("user", &mockRequest{})
require.NoError(t, err)
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func benchmarkGetNextForQuerier(b *testing.B, listeners int, messages int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < messages; j++ {
err := q.EnqueueRequest(user, req, 0)
err := q.EnqueueRequest(user, req)
if err != nil {
panic(err)
}
Expand All @@ -160,7 +160,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste
Name: "test_discarded",
}, []string{"user"})

q := NewRequestQueue(100_000, 0, g, c)
q := NewRequestQueue(100_000, g, c)
start := make(chan struct{})

for i := 0; i < listeners; i++ {
Expand All @@ -173,7 +173,7 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste

batchBuffer := make([]Request, batchSize)
for {
r, last, err = q.GetNextRequestForQuerier(ctx, last, "", batchBuffer)
r, last, err = q.GetNextRequestForQuerier(ctx, last, batchBuffer)
if err != nil {
return
}
Expand All @@ -184,49 +184,12 @@ func queueWithListeners(ctx context.Context, listeners int, batchSize int, liste
}()
}

return q, start
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second

queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))

// Start the queue service.
ctx := context.Background()
require.NoError(t, services.StartAndAwaitRunning(ctx, queue))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, queue))
})

// Two queriers connect.
queue.RegisterQuerierConnection("querier-1")
queue.RegisterQuerierConnection("querier-2")

// Querier-2 waits for a new request.
querier2wg := sync.WaitGroup{}
querier2wg.Add(1)
go func() {
defer querier2wg.Done()
_, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2", make([]Request, 1))
require.NoError(t, err)
}()

// Querier-1 crashes (no graceful shutdown notification).
queue.UnregisterQuerierConnection("querier-1")

// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
require.NoError(t, queue.EnqueueRequest("user-1", "request", 1))

startTime := time.Now()
querier2wg.Wait()
waitTime := time.Since(startTime)
err := services.StartAndAwaitRunning(context.Background(), q)
if err != nil {
panic(err)
}

// We expect that querier-2 got the request only after querier-1 forget delay is passed.
assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds())
return q, start
}

func TestContextCond(t *testing.T) {
Expand Down
Loading