Skip to content

Commit

Permalink
Isolate tree internals into their own package (#9662)
Browse files Browse the repository at this point in the history
* All tests passing

* Update docstrings

* Implement getters/setters and make all QA fields private

* Update tenantQuerierAssignments to tenantQuerierSharding

* Rename MultiQueuingAlgorithmTreeQueue to MultiAlgorithmTreeQueue

* Address PR feedback
  • Loading branch information
chencs authored Oct 21, 2024
1 parent b226653 commit 90558ab
Show file tree
Hide file tree
Showing 17 changed files with 836 additions and 747 deletions.
4 changes: 2 additions & 2 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
return err
}

querierWorkerConn := queue.NewUnregisteredQuerierWorkerConn(server.Context(), queue.QuerierID(querierID))
querierWorkerConn := queue.NewUnregisteredQuerierWorkerConn(server.Context(), querierID)
err = f.requestQueue.AwaitRegisterQuerierWorkerConn(querierWorkerConn)
if err != nil {
return err
Expand Down Expand Up @@ -321,7 +321,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {

func (f *Frontend) NotifyClientShutdown(ctx context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID())
f.requestQueue.SubmitNotifyQuerierShutdown(ctx, queue.QuerierID(req.GetClientID()))
f.requestQueue.SubmitNotifyQuerierShutdown(ctx, req.GetClientID())

return &frontendv1pb.NotifyClientShutdownResponse{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/scheduler/queue/tree"
)

const querierForgetDelay = 0
Expand Down Expand Up @@ -375,23 +377,23 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
for _, weightedQueueDimensionTestCase := range weightedQueueDimensionTestCases {
numTenants := len(weightedQueueDimensionTestCase.tenantQueueDimensionsWeights)

tqaNonFlipped := newTenantQuerierAssignments()
tqaFlipped := newTenantQuerierAssignments()
tqaQuerierWorkerPrioritization := newTenantQuerierAssignments()
tqaNonFlipped := tree.NewTenantQuerierQueuingAlgorithm()
tqaFlipped := tree.NewTenantQuerierQueuingAlgorithm()
tqaQuerierWorkerPrioritization := tree.NewTenantQuerierQueuingAlgorithm()

nonFlippedRoundRobinTree, err := NewTree(tqaNonFlipped, &roundRobinState{})
nonFlippedRoundRobinTree, err := tree.NewTree(tqaNonFlipped, tree.NewRoundRobinState())
require.NoError(t, err)

flippedRoundRobinTree, err := NewTree(&roundRobinState{}, tqaFlipped)
flippedRoundRobinTree, err := tree.NewTree(tree.NewRoundRobinState(), tqaFlipped)
require.NoError(t, err)

querierWorkerPrioritizationTree, err := NewTree(NewQuerierWorkerQueuePriorityAlgo(), tqaQuerierWorkerPrioritization)
querierWorkerPrioritizationTree, err := tree.NewTree(tree.NewQuerierWorkerQueuePriorityAlgo(), tqaQuerierWorkerPrioritization)
require.NoError(t, err)

treeScenarios := []struct {
name string
tree Tree
tqa *tenantQuerierAssignments
tree tree.Tree
tqa *tree.TenantQuerierQueuingAlgorithm
}{
// keeping these names the same length keeps logged results aligned
{
Expand Down Expand Up @@ -438,9 +440,13 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
)
require.NoError(t, err)

// NewRequestQueue constructor does not allow passing in a tree or tenantQuerierAssignments
// NewRequestQueue constructor does not allow passing in a tree or tenantQuerierShards
// so we have to override here to use the same structures as the test case
queue.queueBroker.tenantQuerierAssignments = scenario.tqa
queue.queueBroker.tenantQuerierAssignments = &tenantQuerierShards{
querierIDsSorted: make([]tree.QuerierID, 0),
tenantsByID: make(map[string]*queueTenant),
queuingAlgorithm: scenario.tqa,
}
queue.queueBroker.prioritizeQueryComponents = prioritizeQueryComponents
queue.queueBroker.tree = scenario.tree

Expand Down Expand Up @@ -485,10 +491,13 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
testCaseReports[testCaseName] = report

require.NoError(t, queue.stop(nil))
// ensure everything was dequeued
path, val := scenario.tree.Dequeue(&DequeueArgs{querierID: scenario.tqa.currentQuerier})
assert.NotEqual(t, "", tree.CurrentQuerier(scenario.tqa))
// ensure everything was dequeued; we can pass a nil DequeueArgs because we don't
// want to update any state before doing this (i.e., we're dequeuing for _any_ querier,
// just to make sure the tree is empty).
path, val := scenario.tree.Dequeue(nil)
assert.Nil(t, val)
assert.Equal(t, path, QueuePath{})
assert.Equal(t, path, tree.QueuePath{})
})
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/scheduler/queue/querier_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const unregisteredWorkerID = -1
// querierConnections manages information about queriers connected to the request queue. The queueBroker
// receives information about querier connections via RequestQueue's querierWorkerOperations channel.
type querierConnections struct {
queriersByID map[QuerierID]*querierState
queriersByID map[string]*querierState

// How long to wait before removing a querier which has got disconnected
// but hasn't notified about a graceful shutdown.
Expand All @@ -21,13 +21,13 @@ type querierConnections struct {

func newQuerierConnections(forgetDelay time.Duration) *querierConnections {
return &querierConnections{
queriersByID: map[QuerierID]*querierState{},
queriersByID: map[string]*querierState{},
querierForgetDelay: forgetDelay,
}
}

// querierIsAvailable returns true if the querier is registered to the querierConnections and is not shutting down.
func (qc *querierConnections) querierIsAvailable(querierID QuerierID) bool {
func (qc *querierConnections) querierIsAvailable(querierID string) bool {
q := qc.queriersByID[querierID]
return q != nil && !q.shuttingDown
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (qc *querierConnections) removeQuerierWorkerConn(conn *QuerierWorkerConn, n

// shutdownQuerier handles a graceful shutdown notification from a querier. It updates the querier state to shuttingDown
// if applicable, and returns true if the querier is inactive; the querier can be removed from queriersByID in this case.
func (qc *querierConnections) shutdownQuerier(querierID QuerierID) (canRemoveQuerier bool) {
func (qc *querierConnections) shutdownQuerier(querierID string) (canRemoveQuerier bool) {
querier := qc.queriersByID[querierID]
if querier == nil {
// The querier may have already been removed, so we just ignore it.
Expand All @@ -116,13 +116,13 @@ func (qc *querierConnections) shutdownQuerier(querierID QuerierID) (canRemoveQue
// removeForgettableQueriers removes all querier connections which no longer have any querier-worker connections and for whom
// querierForgetDelay time has passed since the querier disconnected. It returns a slice of all querier IDs which were
// removed.
func (qc *querierConnections) removeForgettableQueriers(now time.Time) []QuerierID {
func (qc *querierConnections) removeForgettableQueriers(now time.Time) []string {
// if forget delay is disabled, removal is done immediately on querier disconnect or shutdown; do nothing
if qc.querierForgetDelay == 0 {
return nil
}

removableQueriers := make([]QuerierID, 0)
removableQueriers := make([]string, 0)
// Remove all queriers with no connections that have gone since at least the forget delay.
threshold := now.Add(-qc.querierForgetDelay)
for querierID, querier := range qc.queriersByID {
Expand All @@ -147,11 +147,11 @@ func (qc *querierConnections) removeForgettableQueriers(now time.Time) []Querier
// In these cases the relevant ID fields are ignored and should be left as their unregistered or zero values.
type QuerierWorkerConn struct {
ctx context.Context
QuerierID QuerierID
QuerierID string
WorkerID int
}

func NewUnregisteredQuerierWorkerConn(ctx context.Context, querierID QuerierID) *QuerierWorkerConn {
func NewUnregisteredQuerierWorkerConn(ctx context.Context, querierID string) *QuerierWorkerConn {
return &QuerierWorkerConn{
ctx: ctx,
QuerierID: querierID,
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (qwo *querierWorkerOperation) AwaitQuerierWorkerConnUpdate() error {
}

type requestToEnqueue struct {
tenantID TenantID
tenantID string
req QueryRequest
maxQueriers int
successFn func()
Expand Down Expand Up @@ -366,15 +366,15 @@ func (q *RequestQueue) enqueueRequestInternal(r requestToEnqueue) error {
err := q.queueBroker.enqueueRequestBack(&tr, r.maxQueriers)
if err != nil {
if errors.Is(err, ErrTooManyRequests) {
q.discardedRequests.WithLabelValues(string(r.tenantID)).Inc()
q.discardedRequests.WithLabelValues(r.tenantID).Inc()
}
return err
}
if r.successFn != nil {
r.successFn()
}

q.queueLength.WithLabelValues(string(r.tenantID)).Inc()
q.queueLength.WithLabelValues(r.tenantID).Inc()
return nil
}

Expand Down Expand Up @@ -410,7 +410,7 @@ func (q *RequestQueue) trySendNextRequestForQuerier(dequeueReq *QuerierWorkerDeq

requestSent := dequeueReq.sendResponse(reqForQuerier)
if requestSent {
q.queueLength.WithLabelValues(string(tenant.tenantID)).Dec()
q.queueLength.WithLabelValues(tenant.tenantID).Dec()
} else {
// should never error; any item previously in the queue already passed validation
err := q.queueBroker.enqueueRequestFront(req, tenant.maxQueriers)
Expand Down Expand Up @@ -439,7 +439,7 @@ func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req QueryRequest,
}()

r := requestToEnqueue{
tenantID: TenantID(tenantID),
tenantID: tenantID,
req: req,
maxQueriers: maxQueriers,
successFn: successFn,
Expand Down Expand Up @@ -533,7 +533,7 @@ func (q *RequestQueue) submitForgetDisconnectedQueriers(ctx context.Context) {

// SubmitNotifyQuerierShutdown is called by the v1 frontend or scheduler when NotifyQuerierShutdown requests
// are submitted from the querier to an endpoint, separate from any specific querier-worker connection.
func (q *RequestQueue) SubmitNotifyQuerierShutdown(ctx context.Context, querierID QuerierID) {
func (q *RequestQueue) SubmitNotifyQuerierShutdown(ctx context.Context, querierID string) {
// Create a generic querier-worker connection to submit the operation.
conn := NewUnregisteredQuerierWorkerConn(ctx, querierID) // querierID matters but workerID does not
q.submitQuerierWorkerOperation(conn, notifyShutdown)
Expand Down
80 changes: 35 additions & 45 deletions pkg/scheduler/queue/queue_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ package queue
import (
"fmt"
"time"
)

type TenantID string

const emptyTenantID = TenantID("")
"github.com/grafana/mimir/pkg/scheduler/queue/tree"
)

// cannot import constants from frontend/v2 due to import cycle
// these are attached to the request's AdditionalQueueDimensions by the frontend.
Expand All @@ -21,20 +19,18 @@ const storeGatewayQueueDimension = "store-gateway"
const ingesterAndStoreGatewayQueueDimension = "ingester-and-store-gateway"
const unknownQueueDimension = "unknown" // utilized when AdditionalQueueDimensions is not assigned by the frontend

type QuerierID string

type tenantRequest struct {
tenantID TenantID
tenantID string
req QueryRequest
}

// queueBroker encapsulates access to the Tree queue for pending requests, and brokers logic dependencies between
// querier connections and tenant-querier assignments (e.g., assigning newly-connected queriers to tenants, or
// reshuffling queriers when a querier has disconnected).
type queueBroker struct {
tree Tree
tree tree.Tree

tenantQuerierAssignments *tenantQuerierAssignments
tenantQuerierAssignments *tenantQuerierShards
querierConnections *querierConnections

maxTenantQueueSize int
Expand All @@ -48,29 +44,29 @@ func newQueueBroker(
) *queueBroker {
qc := newQuerierConnections(forgetDelay)
tqas := newTenantQuerierAssignments()
var tree Tree
var treeQueue tree.Tree
var err error
var algos []QueuingAlgorithm
var algos []tree.QueuingAlgorithm
if prioritizeQueryComponents {
algos = []QueuingAlgorithm{
NewQuerierWorkerQueuePriorityAlgo(), // root; algorithm selects query component based on worker ID
tqas, // query components; algorithm selects tenants
algos = []tree.QueuingAlgorithm{
tree.NewQuerierWorkerQueuePriorityAlgo(), // root; algorithm selects query component based on worker ID
tqas.queuingAlgorithm, // query components; algorithm selects tenants

}
} else {
algos = []QueuingAlgorithm{
tqas, // root; algorithm selects tenants
&roundRobinState{}, // tenant queues; algorithm selects query component
algos = []tree.QueuingAlgorithm{
tqas.queuingAlgorithm, // root; algorithm selects tenants
tree.NewRoundRobinState(), // tenant queues; algorithm selects query component
}
}
tree, err = NewTree(algos...)
treeQueue, err = tree.NewTree(algos...)

// An error building the tree is fatal; we must panic
if err != nil {
panic(fmt.Sprintf("error creating the tree queue: %v", err))
}
qb := &queueBroker{
tree: tree,
tree: treeQueue,
querierConnections: qc,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
Expand Down Expand Up @@ -98,11 +94,8 @@ func (qb *queueBroker) enqueueRequestBack(request *tenantRequest, tenantMaxQueri
return err
}

itemCount := 0
for _, tenantNode := range qb.tenantQuerierAssignments.tenantNodes[string(request.tenantID)] {
itemCount += tenantNode.ItemCount()
}
if itemCount+1 > qb.maxTenantQueueSize {
tenantQueueSize := qb.tenantQuerierAssignments.queuingAlgorithm.TotalQueueSizeForTenant(request.tenantID)
if tenantQueueSize+1 > qb.maxTenantQueueSize {
return ErrTooManyRequests
}

Expand All @@ -128,17 +121,17 @@ func (qb *queueBroker) enqueueRequestFront(request *tenantRequest, tenantMaxQuer
return qb.tree.EnqueueFrontByPath(queuePath, request)
}

func (qb *queueBroker) makeQueuePath(request *tenantRequest) (QueuePath, error) {
func (qb *queueBroker) makeQueuePath(request *tenantRequest) (tree.QueuePath, error) {
// some requests may not be type asserted to a schedulerRequest; in this case,
// they should also be queued as "unknown" query components
queryComponent := unknownQueueDimension
if schedulerRequest, ok := request.req.(*SchedulerRequest); ok {
queryComponent = schedulerRequest.ExpectedQueryComponentName()
}
if qb.prioritizeQueryComponents {
return append([]string{queryComponent}, string(request.tenantID)), nil
return append([]string{queryComponent}, request.tenantID), nil
}
return append(QueuePath{string(request.tenantID)}, queryComponent), nil
return append(tree.QueuePath{request.tenantID}, queryComponent), nil
}

func (qb *queueBroker) dequeueRequestForQuerier(
Expand All @@ -151,45 +144,42 @@ func (qb *queueBroker) dequeueRequestForQuerier(
) {
// check if querier is registered and is not shutting down
if !qb.querierConnections.querierIsAvailable(dequeueReq.QuerierID) {
return nil, nil, qb.tenantQuerierAssignments.tenantOrderIndex, ErrQuerierShuttingDown
return nil, nil, qb.tenantQuerierAssignments.queuingAlgorithm.TenantOrderIndex(), ErrQuerierShuttingDown
}

var queuePath QueuePath
var queuePath tree.QueuePath
var queueElement any
queuePath, queueElement = qb.tree.Dequeue(
&DequeueArgs{
querierID: dequeueReq.QuerierID,
workerID: dequeueReq.WorkerID,
lastTenantIndex: dequeueReq.lastTenantIndex.last,
&tree.DequeueArgs{
QuerierID: dequeueReq.QuerierID,
WorkerID: dequeueReq.WorkerID,
LastTenantIndex: dequeueReq.lastTenantIndex.last,
})

if queueElement == nil {
return nil, nil, qb.tenantQuerierAssignments.tenantOrderIndex, nil
return nil, nil, qb.tenantQuerierAssignments.queuingAlgorithm.TenantOrderIndex(), nil
}

var request *tenantRequest
var tenantID TenantID

// re-casting to same type it was enqueued as; panic would indicate a bug
request = queueElement.(*tenantRequest)
tenantID = request.tenantID
request := queueElement.(*tenantRequest)
tenantID := request.tenantID

var tenant *queueTenant
if tenantID != "" {
tenant = qb.tenantQuerierAssignments.tenantsByID[tenantID]
}

queueNodeAfterDequeue := qb.tree.GetNode(queuePath)
if queueNodeAfterDequeue == nil && len(qb.tenantQuerierAssignments.tenantNodes[string(tenantID)]) == 0 {
// queue node was deleted due to being empty after dequeue
if queueNodeAfterDequeue == nil && qb.tenantQuerierAssignments.queuingAlgorithm.TotalQueueSizeForTenant(tenantID) == 0 {
// queue node was deleted due to being empty after dequeue, and there are no remaining queue items for this tenant
qb.tenantQuerierAssignments.removeTenant(tenantID)
}

return request, tenant, qb.tenantQuerierAssignments.tenantOrderIndex, nil
return request, tenant, qb.tenantQuerierAssignments.queuingAlgorithm.TenantOrderIndex(), nil
}

// below methods simply pass through to the queueBroker's tenantQuerierAssignments; this layering could be skipped
// but there is no reason to make consumers know that they need to call through to the tenantQuerierAssignments.
// below methods simply pass through to the queueBroker's tenantQuerierShards; this layering could be skipped
// but there is no reason to make consumers know that they need to call through to the tenantQuerierShards.

func (qb *queueBroker) addQuerierWorkerConn(conn *QuerierWorkerConn) (resharded bool) {
// if conn is for a new querier, we need to recompute tenant querier relationship; otherwise, we don't reshard
Expand All @@ -210,7 +200,7 @@ func (qb *queueBroker) removeQuerierWorkerConn(conn *QuerierWorkerConn, now time

// notifyQuerierShutdown handles a graceful shutdown notification from a querier.
// Returns true if tenant-querier reshard was triggered.
func (qb *queueBroker) notifyQuerierShutdown(querierID QuerierID) (resharded bool) {
func (qb *queueBroker) notifyQuerierShutdown(querierID string) (resharded bool) {
if removedQuerier := qb.querierConnections.shutdownQuerier(querierID); removedQuerier {
return qb.tenantQuerierAssignments.removeQueriers(querierID)
}
Expand Down
Loading

0 comments on commit 90558ab

Please sign in to comment.