diff --git a/CHANGELOG.md b/CHANGELOG.md index 15bd89a7a69..61fde7f9fe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ * `-memberlist.acquire-writer-timeout` * [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594 * [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645 +* [ENHANCEMENT] Query-scheduler: Remove the experimental `query-scheduler.prioritize-query-components` flag. Request queues always prioritize query component dequeuing above tenant fairness. #9703 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index c9c0754f665..d40e91c862f 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -16385,17 +16385,6 @@ "fieldFlag": "query-scheduler.max-outstanding-requests-per-tenant", "fieldType": "int" }, - { - "kind": "field", - "name": "prioritize_query_components", - "required": false, - "desc": "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness.", - "fieldValue": null, - "fieldDefaultValue": false, - "fieldFlag": "query-scheduler.prioritize-query-components", - "fieldType": "boolean", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "querier_forget_delay", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 6417172c9f9..64323dbf111 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2315,8 +2315,6 @@ Usage of ./cmd/mimir/mimir: Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100) -query-scheduler.max-used-instances int The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances. - -query-scheduler.prioritize-query-components - [experimental] When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. -query-scheduler.querier-forget-delay duration [experimental] If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled. -query-scheduler.ring.consul.acl-token string diff --git a/development/mimir-microservices-mode/config/mimir.yaml b/development/mimir-microservices-mode/config/mimir.yaml index cd966e6de11..11d002519fd 100644 --- a/development/mimir-microservices-mode/config/mimir.yaml +++ b/development/mimir-microservices-mode/config/mimir.yaml @@ -164,7 +164,6 @@ querier: query_scheduler: # Change to "dns" to switch to query-scheduler DNS-based service discovery. service_discovery_mode: "ring" - prioritize_query_components: true limits: # Limit max query time range to 31d diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 5da9d5db8f8..73fb0a7093a 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1718,13 +1718,6 @@ The `query_scheduler` block configures the query-scheduler. # CLI flag: -query-scheduler.max-outstanding-requests-per-tenant [max_outstanding_requests_per_tenant: | default = 100] -# (experimental) When enabled, the query scheduler primarily prioritizes -# dequeuing fairly from queue components and secondarily prioritizes dequeuing -# fairly across tenants. When disabled, the query scheduler primarily -# prioritizes tenant fairness. -# CLI flag: -query-scheduler.prioritize-query-components -[prioritize_query_components: | default = false] - # (experimental) If a querier disconnects without sending notification about # graceful shutdown, the query-scheduler will keep the querier in the tenant's # shard until the forget delay has passed. This feature is useful to reduce the diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 01fc14b2642..b1ac4d90967 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -126,7 +126,6 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist f.requestQueue, err = queue.NewRequestQueue( log, cfg.MaxOutstandingPerTenant, - false, //prioritizeQueryComponents -- currently no-op cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, diff --git a/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go b/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go index 05d8702954d..1e80653baba 100644 --- a/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go +++ b/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go @@ -377,13 +377,9 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { for _, weightedQueueDimensionTestCase := range weightedQueueDimensionTestCases { numTenants := len(weightedQueueDimensionTestCase.tenantQueueDimensionsWeights) - tqaNonFlipped := tree.NewTenantQuerierQueuingAlgorithm() tqaFlipped := tree.NewTenantQuerierQueuingAlgorithm() tqaQuerierWorkerPrioritization := tree.NewTenantQuerierQueuingAlgorithm() - nonFlippedRoundRobinTree, err := tree.NewTree(tqaNonFlipped, tree.NewRoundRobinState()) - require.NoError(t, err) - flippedRoundRobinTree, err := tree.NewTree(tree.NewRoundRobinState(), tqaFlipped) require.NoError(t, err) @@ -396,11 +392,6 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { tqa *tree.TenantQuerierQueuingAlgorithm }{ // keeping these names the same length keeps logged results aligned - { - "tenant-querier -> query component round-robin tree", - nonFlippedRoundRobinTree, - tqaNonFlipped, - }, { "query component round-robin -> tenant-querier tree", flippedRoundRobinTree, @@ -424,14 +415,10 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { queryComponentQueueDurationObservations: map[string][]float64{}, } - // only the non-flipped tree uses the old tenant -> query component hierarchy - prioritizeQueryComponents := scenario.tree != nonFlippedRoundRobinTree - t.Run(testCaseName, func(t *testing.T) { queue, err := NewRequestQueue( log.NewNopLogger(), maxOutStandingPerTenant, - prioritizeQueryComponents, querierForgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -447,7 +434,6 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { tenantsByID: make(map[string]*queueTenant), queuingAlgorithm: scenario.tqa, } - queue.queueBroker.prioritizeQueryComponents = prioritizeQueryComponents queue.queueBroker.tree = scenario.tree ctx := context.Background() diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 4c9f1f5c274..debb196b9a6 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -210,7 +210,6 @@ type requestToEnqueue struct { func NewRequestQueue( log log.Logger, maxOutstandingPerTenant int, - prioritizeQueryComponents bool, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, @@ -246,7 +245,7 @@ func NewRequestQueue( waitingDequeueRequestsToDispatch: list.New(), QueryComponentUtilization: queryComponentCapacity, - queueBroker: newQueueBroker(maxOutstandingPerTenant, prioritizeQueryComponents, forgetDelay), + queueBroker: newQueueBroker(maxOutstandingPerTenant, forgetDelay), } q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue") diff --git a/pkg/scheduler/queue/queue_broker.go b/pkg/scheduler/queue/queue_broker.go index c67ce09de52..564a8d8e5c5 100644 --- a/pkg/scheduler/queue/queue_broker.go +++ b/pkg/scheduler/queue/queue_broker.go @@ -33,31 +33,20 @@ type queueBroker struct { tenantQuerierAssignments *tenantQuerierShards querierConnections *querierConnections - maxTenantQueueSize int - prioritizeQueryComponents bool + maxTenantQueueSize int } func newQueueBroker( maxTenantQueueSize int, - prioritizeQueryComponents bool, forgetDelay time.Duration, ) *queueBroker { qc := newQuerierConnections(forgetDelay) tqas := newTenantQuerierAssignments() var treeQueue tree.Tree var err error - var algos []tree.QueuingAlgorithm - if prioritizeQueryComponents { - algos = []tree.QueuingAlgorithm{ - tree.NewQuerierWorkerQueuePriorityAlgo(), // root; algorithm selects query component based on worker ID - tqas.queuingAlgorithm, // query components; algorithm selects tenants - - } - } else { - algos = []tree.QueuingAlgorithm{ - tqas.queuingAlgorithm, // root; algorithm selects tenants - tree.NewRoundRobinState(), // tenant queues; algorithm selects query component - } + algos := []tree.QueuingAlgorithm{ + tree.NewQuerierWorkerQueuePriorityAlgo(), // root; algorithm selects query component based on worker ID + tqas.queuingAlgorithm, // query components; algorithm selects tenants } treeQueue, err = tree.NewTree(algos...) @@ -66,11 +55,10 @@ func newQueueBroker( panic(fmt.Sprintf("error creating the tree queue: %v", err)) } qb := &queueBroker{ - tree: treeQueue, - querierConnections: qc, - tenantQuerierAssignments: tqas, - maxTenantQueueSize: maxTenantQueueSize, - prioritizeQueryComponents: prioritizeQueryComponents, + tree: treeQueue, + querierConnections: qc, + tenantQuerierAssignments: tqas, + maxTenantQueueSize: maxTenantQueueSize, } return qb @@ -128,10 +116,7 @@ func (qb *queueBroker) makeQueuePath(request *tenantRequest) (tree.QueuePath, er if schedulerRequest, ok := request.req.(*SchedulerRequest); ok { queryComponent = schedulerRequest.ExpectedQueryComponentName() } - if qb.prioritizeQueryComponents { - return append([]string{queryComponent}, request.tenantID), nil - } - return append(tree.QueuePath{request.tenantID}, queryComponent), nil + return append([]string{queryComponent}, request.tenantID), nil } func (qb *queueBroker) dequeueRequestForQuerier( diff --git a/pkg/scheduler/queue/queue_broker_test.go b/pkg/scheduler/queue/queue_broker_test.go index a27a0727048..9e0fd8868af 100644 --- a/pkg/scheduler/queue/queue_broker_test.go +++ b/pkg/scheduler/queue/queue_broker_test.go @@ -29,11 +29,7 @@ func (qb *queueBroker) enqueueObjectsForTests(tenantID string, numObjects int) e var path tree.QueuePath var err error if _, ok := qb.tree.(*tree.MultiAlgorithmTreeQueue); ok { - if qb.prioritizeQueryComponents { - path = tree.QueuePath{unknownQueueDimension, tenantID} - } else { - path = tree.QueuePath{tenantID, unknownQueueDimension} - } + path = tree.QueuePath{unknownQueueDimension, tenantID} } else { path, err = qb.makeQueuePath(req) @@ -83,464 +79,424 @@ func assertExpectedValuesOnDequeue(t *testing.T, qb *queueBroker, lastTenantInde // handle queries for any tenant, as tenant queues are added and removed func TestQueues_NoShuffleSharding(t *testing.T) { - treeTypes := buildTreeTestsStruct() - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - qb := newQueueBroker(0, tt.prioritizeQueryComponents, 0) - assert.NotNil(t, qb) - assert.NoError(t, isConsistent(qb)) - - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1")) - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2")) - - req, tenant, lastTenantIndexQuerierOne, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, - lastTenantIndex: TenantIndex{-1}, - }) - assert.Nil(t, req) - assert.Nil(t, tenant) - assert.NoError(t, err) - - // Add tenant queues: tenant "one" - err = qb.tenantQuerierAssignments.createOrUpdateTenant("one", 0) - assert.NoError(t, err) - - tenantOne := qb.tenantQuerierAssignments.tenantsByID["one"] - - // Queue enough objects for tenant "one" - err = qb.enqueueObjectsForTests(tenantOne.tenantID, 10) - assert.NoError(t, err) - assert.NoError(t, isConsistent(qb)) - - //queuePathOne := QueuePath{"root", "one"} - expectedDequeueVals := []dequeueVal{ - {buildExpectedObject(tenantOne.tenantID, 0), tenantOne}, - {buildExpectedObject(tenantOne.tenantID, 1), tenantOne}, - } - lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) + qb := newQueueBroker(0, 0) + assert.NotNil(t, qb) + assert.NoError(t, isConsistent(qb)) + + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1")) + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2")) + + req, tenant, lastTenantIndexQuerierOne, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, + lastTenantIndex: TenantIndex{-1}, + }) + assert.Nil(t, req) + assert.Nil(t, tenant) + assert.NoError(t, err) + + // Add tenant queues: tenant "one" + err = qb.tenantQuerierAssignments.createOrUpdateTenant("one", 0) + assert.NoError(t, err) + + tenantOne := qb.tenantQuerierAssignments.tenantsByID["one"] + + // Queue enough objects for tenant "one" + err = qb.enqueueObjectsForTests(tenantOne.tenantID, 10) + assert.NoError(t, err) + assert.NoError(t, isConsistent(qb)) + + //queuePathOne := QueuePath{"root", "one"} + expectedDequeueVals := []dequeueVal{ + {buildExpectedObject(tenantOne.tenantID, 0), tenantOne}, + {buildExpectedObject(tenantOne.tenantID, 1), tenantOne}, + } + lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) - // Add tenant two - err = qb.tenantQuerierAssignments.createOrUpdateTenant("two", 0) - assert.NoError(t, err) + // Add tenant two + err = qb.tenantQuerierAssignments.createOrUpdateTenant("two", 0) + assert.NoError(t, err) - tenantTwo := qb.tenantQuerierAssignments.tenantsByID["two"] + tenantTwo := qb.tenantQuerierAssignments.tenantsByID["two"] - err = qb.enqueueObjectsForTests(tenantTwo.tenantID, 10) - assert.NoError(t, err) - assert.NoError(t, isConsistent(qb)) + err = qb.enqueueObjectsForTests(tenantTwo.tenantID, 10) + assert.NoError(t, err) + assert.NoError(t, isConsistent(qb)) - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantTwo.tenantID, 0), tenantTwo}, - {buildExpectedObject(tenantOne.tenantID, 2), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 1), tenantTwo}, - {buildExpectedObject(tenantOne.tenantID, 3), tenantOne}, - } + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantTwo.tenantID, 0), tenantTwo}, + {buildExpectedObject(tenantOne.tenantID, 2), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 1), tenantTwo}, + {buildExpectedObject(tenantOne.tenantID, 3), tenantOne}, + } - lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) + lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantOne.tenantID, 4), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 2), tenantTwo}, - {buildExpectedObject(tenantOne.tenantID, 5), tenantOne}, - } - lastTenantIndexQuerierTwo := assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals) - - // [one two three] - // confirm fifo by adding a third tenant queue and iterating to it - err = qb.tenantQuerierAssignments.createOrUpdateTenant("three", 0) - assert.NoError(t, err) + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantOne.tenantID, 4), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 2), tenantTwo}, + {buildExpectedObject(tenantOne.tenantID, 5), tenantOne}, + } + lastTenantIndexQuerierTwo := assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals) - tenantThree := qb.tenantQuerierAssignments.tenantsByID["three"] + // [one two three] + // confirm fifo by adding a third tenant queue and iterating to it + err = qb.tenantQuerierAssignments.createOrUpdateTenant("three", 0) + assert.NoError(t, err) - err = qb.enqueueObjectsForTests(tenantThree.tenantID, 10) - assert.NoError(t, err) - assert.NoError(t, isConsistent(qb)) + tenantThree := qb.tenantQuerierAssignments.tenantsByID["three"] - // lastTenantIndexQuerierOne was 0 (tenantOne) for querier-1; appending - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantTwo.tenantID, 3), tenantTwo}, - {buildExpectedObject(tenantThree.tenantID, 0), tenantThree}, - {buildExpectedObject(tenantOne.tenantID, 6), tenantOne}, - } + err = qb.enqueueObjectsForTests(tenantThree.tenantID, 10) + assert.NoError(t, err) + assert.NoError(t, isConsistent(qb)) - lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) + // lastTenantIndexQuerierOne was 0 (tenantOne) for querier-1; appending + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantTwo.tenantID, 3), tenantTwo}, + {buildExpectedObject(tenantThree.tenantID, 0), tenantThree}, + {buildExpectedObject(tenantOne.tenantID, 6), tenantOne}, + } - // Remove one: ["" two three] - qb.removeTenantQueue("one") - assert.NoError(t, isConsistent(qb)) + lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantTwo.tenantID, 4), tenantTwo}, - {buildExpectedObject(tenantThree.tenantID, 1), tenantThree}, - {buildExpectedObject(tenantTwo.tenantID, 5), tenantTwo}, - } - lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) + // Remove one: ["" two three] + qb.removeTenantQueue("one") + assert.NoError(t, isConsistent(qb)) - // "four" is added at the beginning of the list: [four two three] - err = qb.tenantQuerierAssignments.createOrUpdateTenant("four", 0) - assert.NoError(t, err) + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantTwo.tenantID, 4), tenantTwo}, + {buildExpectedObject(tenantThree.tenantID, 1), tenantThree}, + {buildExpectedObject(tenantTwo.tenantID, 5), tenantTwo}, + } + lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) - tenantFour := qb.tenantQuerierAssignments.tenantsByID["four"] + // "four" is added at the beginning of the list: [four two three] + err = qb.tenantQuerierAssignments.createOrUpdateTenant("four", 0) + assert.NoError(t, err) - err = qb.enqueueObjectsForTests(tenantFour.tenantID, 10) - assert.NoError(t, err) - assert.NoError(t, isConsistent(qb)) + tenantFour := qb.tenantQuerierAssignments.tenantsByID["four"] - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantTwo.tenantID, 6), tenantTwo}, - {buildExpectedObject(tenantThree.tenantID, 2), tenantThree}, - {buildExpectedObject(tenantFour.tenantID, 0), tenantFour}, - {buildExpectedObject(tenantTwo.tenantID, 7), tenantTwo}, - } + err = qb.enqueueObjectsForTests(tenantFour.tenantID, 10) + assert.NoError(t, err) + assert.NoError(t, isConsistent(qb)) - _ = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierTwo, "querier-2", expectedDequeueVals) + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantTwo.tenantID, 6), tenantTwo}, + {buildExpectedObject(tenantThree.tenantID, 2), tenantThree}, + {buildExpectedObject(tenantFour.tenantID, 0), tenantFour}, + {buildExpectedObject(tenantTwo.tenantID, 7), tenantTwo}, + } - // Remove two: [four "" three] - qb.removeTenantQueue("two") - assert.NoError(t, isConsistent(qb)) + _ = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierTwo, "querier-2", expectedDequeueVals) - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantThree.tenantID, 3), tenantThree}, - {buildExpectedObject(tenantFour.tenantID, 1), tenantFour}, - {buildExpectedObject(tenantThree.tenantID, 4), tenantThree}, - } - lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) + // Remove two: [four "" three] + qb.removeTenantQueue("two") + assert.NoError(t, isConsistent(qb)) - // Remove three: [four ""] ) - qb.removeTenantQueue("three") - assert.NoError(t, isConsistent(qb)) + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantThree.tenantID, 3), tenantThree}, + {buildExpectedObject(tenantFour.tenantID, 1), tenantFour}, + {buildExpectedObject(tenantThree.tenantID, 4), tenantThree}, + } + lastTenantIndexQuerierOne = assertExpectedValuesOnDequeue(t, qb, lastTenantIndexQuerierOne, "querier-1", expectedDequeueVals) - // Remove four: [] - qb.removeTenantQueue("four") - assert.NoError(t, isConsistent(qb)) + // Remove three: [four ""] ) + qb.removeTenantQueue("three") + assert.NoError(t, isConsistent(qb)) - req, tenant, _, err = qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, - lastTenantIndex: TenantIndex{lastTenantIndexQuerierOne}, - }, - ) - assert.Nil(t, req) - assert.Nil(t, tenant) - assert.NoError(t, err) + // Remove four: [] + qb.removeTenantQueue("four") + assert.NoError(t, isConsistent(qb)) - }) - } + req, tenant, _, err = qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, + lastTenantIndex: TenantIndex{lastTenantIndexQuerierOne}, + }, + ) + assert.Nil(t, req) + assert.Nil(t, tenant) + assert.NoError(t, err) } func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) { - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - maxTenantQueueSize := 100 - qb := newQueueBroker(maxTenantQueueSize, tt.prioritizeQueryComponents, 0) - additionalQueueDimensions := map[int][]string{ - 0: nil, - 1: {"ingester"}, - 2: {"store-gateway"}, - 3: {"ingester-and-store-gateway"}, - } - req := &SchedulerRequest{ - Ctx: context.Background(), - FrontendAddr: "http://query-frontend:8007", - UserID: "tenant-1", - Request: &httpgrpc.HTTPRequest{}, - } - - // build queue evenly with either no additional queue dimension or one of 3 additional dimensions - for i := 0; i < len(additionalQueueDimensions); i++ { - for j := 0; j < maxTenantQueueSize/len(additionalQueueDimensions); j++ { - req.AdditionalQueueDimensions = additionalQueueDimensions[i] - tenantReq := &tenantRequest{tenantID: "tenant-1", req: req} - err := qb.enqueueRequestBack(tenantReq, 0) - assert.NoError(t, err) - } - } - // assert item count of tenant node and its subnodes - queuePath := tree.QueuePath{"tenant-1"} - - var itemCount int - // if prioritizeQueryComponents, we need to build paths for each queue dimension - // and sum all items - if qb.prioritizeQueryComponents { - for _, addlQueueDim := range additionalQueueDimensions { - var path tree.QueuePath - path = append(append(path, addlQueueDim...), "tenant-1") - if addlQueueDim == nil { - path = qb.makeQueuePathForTests("tenant-1") - } - itemCount += qb.tree.GetNode(path).ItemCount() - } - assert.Equal(t, maxTenantQueueSize, itemCount) - - } else { - assert.Equal(t, maxTenantQueueSize, qb.tree.GetNode(queuePath).ItemCount()) - } - - // assert equal distribution of queue items between 4 subnodes - for _, v := range additionalQueueDimensions { - var checkPath tree.QueuePath - if v == nil { - v = []string{unknownQueueDimension} - } - if qb.prioritizeQueryComponents { - checkPath = append(append(checkPath, v...), "tenant-1") - } else { - checkPath = append(tree.QueuePath{"tenant-1"}, v...) - } - - dimensionItemCount := qb.tree.GetNode(checkPath).ItemCount() - assert.Equal(t, maxTenantQueueSize/len(additionalQueueDimensions), dimensionItemCount) - } - - // assert error received when hitting a tenant's enqueue limit, - // even though most of the requests are in the subqueues - for _, additionalQueueDimension := range additionalQueueDimensions { - // error should be received no matter if the enqueue attempt - // is for the tenant queue or any of its subqueues - req.AdditionalQueueDimensions = additionalQueueDimension - tenantReq := &tenantRequest{tenantID: "tenant-1", req: req} - err := qb.enqueueRequestBack(tenantReq, 0) - assert.ErrorIs(t, err, ErrTooManyRequests) - } - - // dequeue a request - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1")) - dequeuedTenantReq, _, _, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, - lastTenantIndex: TenantIndex{-1}, - }) - assert.NoError(t, err) - assert.NotNil(t, dequeuedTenantReq) + maxTenantQueueSize := 100 + qb := newQueueBroker(maxTenantQueueSize, 0) + additionalQueueDimensions := map[int][]string{ + 0: {unknownQueueDimension}, + 1: {ingesterQueueDimension}, + 2: {storeGatewayQueueDimension}, + 3: {ingesterAndStoreGatewayQueueDimension}, + } + req := &SchedulerRequest{ + Ctx: context.Background(), + FrontendAddr: "http://query-frontend:8007", + UserID: "tenant-1", + Request: &httpgrpc.HTTPRequest{}, + } + // build queue evenly with either no additional queue dimension or one of 3 additional dimensions + for i := 0; i < len(additionalQueueDimensions); i++ { + for j := 0; j < maxTenantQueueSize/len(additionalQueueDimensions); j++ { + req.AdditionalQueueDimensions = additionalQueueDimensions[i] tenantReq := &tenantRequest{tenantID: "tenant-1", req: req} - // assert not hitting an error when enqueueing after dequeuing to below the limit - err = qb.enqueueRequestBack(tenantReq, 0) + err := qb.enqueueRequestBack(tenantReq, 0) assert.NoError(t, err) - - // we then hit an error again, as we are back at the limit - err = qb.enqueueRequestBack(tenantReq, 0) - assert.ErrorIs(t, err, ErrTooManyRequests) - }) + } } -} - -func TestQueuesOnTerminatingQuerier(t *testing.T) { - treeTypes := buildTreeTestsStruct() - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - qb := newQueueBroker(0, tt.prioritizeQueryComponents, 0) - assert.NotNil(t, qb) - assert.NoError(t, isConsistent(qb)) - - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1")) - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2")) - // Add queues: [one two] - err := qb.tenantQuerierAssignments.createOrUpdateTenant("one", 0) - assert.NoError(t, err) - err = qb.tenantQuerierAssignments.createOrUpdateTenant("two", 0) - assert.NoError(t, err) + var itemCount int + // build paths for each queue dimension and sum all items + for _, addlQueueDim := range additionalQueueDimensions { + var path tree.QueuePath + // assert item count of tenant node and its subnodes + path = append(append(path, addlQueueDim...), "tenant-1") + if addlQueueDim == nil { + path = qb.makeQueuePathForTests("tenant-1") + } + itemCount += qb.tree.GetNode(path).ItemCount() + } + assert.Equal(t, maxTenantQueueSize, itemCount) - err = qb.enqueueObjectsForTests("one", 10) - assert.NoError(t, err) - tenantOne := qb.tenantQuerierAssignments.tenantsByID["one"] + // assert equal distribution of queue items between 4 subnodes + for _, v := range additionalQueueDimensions { + var checkPath tree.QueuePath + if v == nil { + v = []string{unknownQueueDimension} + } + checkPath = append(append(checkPath, v...), "tenant-1") - err = qb.enqueueObjectsForTests("two", 10) - assert.NoError(t, err) - tenantTwo := qb.tenantQuerierAssignments.tenantsByID["two"] + dimensionItemCount := qb.tree.GetNode(checkPath).ItemCount() + assert.Equal(t, maxTenantQueueSize/len(additionalQueueDimensions), dimensionItemCount) + } - expectedDequeueVals := []dequeueVal{ - {buildExpectedObject(tenantOne.tenantID, 0), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 0), tenantTwo}, - {buildExpectedObject(tenantOne.tenantID, 1), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 1), tenantTwo}, - } - qOneLastTenantIndex := assertExpectedValuesOnDequeue(t, qb, -1, "querier-1", expectedDequeueVals) + // assert error received when hitting a tenant's enqueue limit, + // even though most of the requests are in the subqueues + for _, additionalQueueDimension := range additionalQueueDimensions { + // error should be received no matter if the enqueue attempt + // is for the tenant queue or any of its subqueues + req.AdditionalQueueDimensions = additionalQueueDimension + tenantReq := &tenantRequest{tenantID: "tenant-1", req: req} + err := qb.enqueueRequestBack(tenantReq, 0) + assert.ErrorIs(t, err, ErrTooManyRequests) + } - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantOne.tenantID, 2), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 2), tenantTwo}, - {buildExpectedObject(tenantOne.tenantID, 3), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 3), tenantTwo}, - } - qTwolastTenantIndex := assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals) - - // After notify shutdown for querier-2, it's expected to own no queue. - qb.notifyQuerierShutdown("querier-2") - req, tenant, qTwolastTenantIndex, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-2"}, - lastTenantIndex: TenantIndex{qTwolastTenantIndex}, - }) - assert.Nil(t, req) - assert.Nil(t, tenant) - assert.Equal(t, ErrQuerierShuttingDown, err) - - // However, querier-1 still get queues because it's still running. - expectedDequeueVals = []dequeueVal{ - {buildExpectedObject(tenantOne.tenantID, 4), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 4), tenantTwo}, - {buildExpectedObject(tenantOne.tenantID, 5), tenantOne}, - {buildExpectedObject(tenantTwo.tenantID, 5), tenantTwo}, - } + // dequeue a request + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1")) + dequeuedTenantReq, _, _, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, + lastTenantIndex: TenantIndex{-1}, + }) + assert.NoError(t, err) + assert.NotNil(t, dequeuedTenantReq) + + tenantReq := &tenantRequest{tenantID: "tenant-1", req: req} + // assert not hitting an error when enqueueing after dequeuing to below the limit + err = qb.enqueueRequestBack(tenantReq, 0) + assert.NoError(t, err) + + // we then hit an error again, as we are back at the limit + err = qb.enqueueRequestBack(tenantReq, 0) + assert.ErrorIs(t, err, ErrTooManyRequests) +} - for _, expected := range expectedDequeueVals { - req, tenant, qOneLastTenantIndex, err = qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, - lastTenantIndex: TenantIndex{qOneLastTenantIndex}, - }) - assert.Equal(t, expected.req, req) - assert.Equal(t, expected.tenant, tenant) - assert.NoError(t, err) - } +func TestQueuesOnTerminatingQuerier(t *testing.T) { + qb := newQueueBroker(0, 0) + assert.NotNil(t, qb) + assert.NoError(t, isConsistent(qb)) + + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1")) + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2")) + + // Add queues: [one two] + err := qb.tenantQuerierAssignments.createOrUpdateTenant("one", 0) + assert.NoError(t, err) + err = qb.tenantQuerierAssignments.createOrUpdateTenant("two", 0) + assert.NoError(t, err) + + err = qb.enqueueObjectsForTests("one", 10) + assert.NoError(t, err) + tenantOne := qb.tenantQuerierAssignments.tenantsByID["one"] + + err = qb.enqueueObjectsForTests("two", 10) + assert.NoError(t, err) + tenantTwo := qb.tenantQuerierAssignments.tenantsByID["two"] + + expectedDequeueVals := []dequeueVal{ + {buildExpectedObject(tenantOne.tenantID, 0), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 0), tenantTwo}, + {buildExpectedObject(tenantOne.tenantID, 1), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 1), tenantTwo}, + } + qOneLastTenantIndex := assertExpectedValuesOnDequeue(t, qb, -1, "querier-1", expectedDequeueVals) - // After disconnecting querier-2, it's expected to own no queue. - qb.tenantQuerierAssignments.removeQueriers("querier-2") - req, tenant, _, err = qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-2"}, - lastTenantIndex: TenantIndex{qTwolastTenantIndex}, - }) - assert.Nil(t, req) - assert.Nil(t, tenant) - assert.Equal(t, ErrQuerierShuttingDown, err) + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantOne.tenantID, 2), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 2), tenantTwo}, + {buildExpectedObject(tenantOne.tenantID, 3), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 3), tenantTwo}, + } + qTwolastTenantIndex := assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals) + + // After notify shutdown for querier-2, it's expected to own no queue. + qb.notifyQuerierShutdown("querier-2") + req, tenant, qTwolastTenantIndex, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-2"}, + lastTenantIndex: TenantIndex{qTwolastTenantIndex}, + }) + assert.Nil(t, req) + assert.Nil(t, tenant) + assert.Equal(t, ErrQuerierShuttingDown, err) + + // However, querier-1 still get queues because it's still running. + expectedDequeueVals = []dequeueVal{ + {buildExpectedObject(tenantOne.tenantID, 4), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 4), tenantTwo}, + {buildExpectedObject(tenantOne.tenantID, 5), tenantOne}, + {buildExpectedObject(tenantTwo.tenantID, 5), tenantTwo}, + } + for _, expected := range expectedDequeueVals { + req, tenant, qOneLastTenantIndex, err = qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-1"}, + lastTenantIndex: TenantIndex{qOneLastTenantIndex}, }) + assert.Equal(t, expected.req, req) + assert.Equal(t, expected.tenant, tenant) + assert.NoError(t, err) } + + // After disconnecting querier-2, it's expected to own no queue. + qb.tenantQuerierAssignments.removeQueriers("querier-2") + req, tenant, _, err = qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: "querier-2"}, + lastTenantIndex: TenantIndex{qTwolastTenantIndex}, + }) + assert.Nil(t, req) + assert.Nil(t, tenant) + assert.Equal(t, ErrQuerierShuttingDown, err) } func TestQueues_QuerierDistribution(t *testing.T) { - treeTypes := buildTreeTestsStruct() - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - qb := newQueueBroker(0, tt.prioritizeQueryComponents, 0) - assert.NotNil(t, qb) - assert.NoError(t, isConsistent(qb)) - - queriers := 30 - numTenants := 1000 - maxQueriersPerTenant := 5 - - // Add some queriers. - for ix := 0; ix < queriers; ix++ { - qid := fmt.Sprintf("querier-%d", ix) - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), qid)) - - // No querier has any queues yet. - req, tenant, _, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{QuerierID: qid}, - lastTenantIndex: TenantIndex{-1}, - }) - assert.Nil(t, req) - assert.Nil(t, tenant) - assert.NoError(t, err) - } - - assert.NoError(t, isConsistent(qb)) + qb := newQueueBroker(0, 0) + assert.NotNil(t, qb) + assert.NoError(t, isConsistent(qb)) + + queriers := 30 + numTenants := 1000 + maxQueriersPerTenant := 5 + + // Add some queriers. + for ix := 0; ix < queriers; ix++ { + qid := fmt.Sprintf("querier-%d", ix) + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), qid)) + + // No querier has any queues yet. + req, tenant, _, err := qb.dequeueRequestForQuerier(&QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{QuerierID: qid}, + lastTenantIndex: TenantIndex{-1}, + }) + assert.Nil(t, req) + assert.Nil(t, tenant) + assert.NoError(t, err) + } - // Add tenant queues. - for i := 0; i < numTenants; i++ { - uid := fmt.Sprintf("tenant-%d", i) - err := qb.tenantQuerierAssignments.createOrUpdateTenant(uid, maxQueriersPerTenant) - assert.NoError(t, err) + assert.NoError(t, isConsistent(qb)) - //Enqueue some stuff so that the tree queue node exists - err = qb.enqueueObjectsForTests(uid, 1) - assert.NoError(t, err) + // Add tenant queues. + for i := 0; i < numTenants; i++ { + uid := fmt.Sprintf("tenant-%d", i) + err := qb.tenantQuerierAssignments.createOrUpdateTenant(uid, maxQueriersPerTenant) + assert.NoError(t, err) - // Verify it has maxQueriersPerTenant queriers assigned now. - qs := qb.tenantQuerierAssignments.queriersForTenant(uid) - assert.Equal(t, maxQueriersPerTenant, len(qs)) - } + //Enqueue some stuff so that the tree queue node exists + err = qb.enqueueObjectsForTests(uid, 1) + assert.NoError(t, err) - // After adding all tenants, verify results. For each querier, find out how many different tenants it handles, - // and compute mean and stdDev. - queriersMap := make(map[tree.QuerierID]int) + // Verify it has maxQueriersPerTenant queriers assigned now. + qs := qb.tenantQuerierAssignments.queriersForTenant(uid) + assert.Equal(t, maxQueriersPerTenant, len(qs)) + } - for tenantID := range qb.tenantQuerierAssignments.tenantsByID { - querierSet := qb.tenantQuerierAssignments.queriersForTenant(tenantID) - for querierID := range querierSet { - queriersMap[querierID]++ - } - } + // After adding all tenants, verify results. For each querier, find out how many different tenants it handles, + // and compute mean and stdDev. + queriersMap := make(map[tree.QuerierID]int) - mean := float64(0) - for _, c := range queriersMap { - mean += float64(c) - } - mean = mean / float64(len(queriersMap)) + for tenantID := range qb.tenantQuerierAssignments.tenantsByID { + querierSet := qb.tenantQuerierAssignments.queriersForTenant(tenantID) + for querierID := range querierSet { + queriersMap[querierID]++ + } + } - stdDev := float64(0) - for _, c := range queriersMap { - d := float64(c) - mean - stdDev += (d * d) - } - stdDev = math.Sqrt(stdDev / float64(len(queriersMap))) - t.Log("mean:", mean, "stddev:", stdDev) + mean := float64(0) + for _, c := range queriersMap { + mean += float64(c) + } + mean = mean / float64(len(queriersMap)) - assert.InDelta(t, numTenants*maxQueriersPerTenant/queriers, mean, 1) - assert.InDelta(t, stdDev, 0, mean*0.2) - }) + stdDev := float64(0) + for _, c := range queriersMap { + d := float64(c) - mean + stdDev += (d * d) } + stdDev = math.Sqrt(stdDev / float64(len(queriersMap))) + t.Log("mean:", mean, "stddev:", stdDev) + + assert.InDelta(t, numTenants*maxQueriersPerTenant/queriers, mean, 1) + assert.InDelta(t, stdDev, 0, mean*0.2) } func TestQueuesConsistency(t *testing.T) { - treeTypes := buildTreeTestsStruct() tests := map[string]struct { forgetDelay time.Duration }{ "without forget delay": {}, "with forget delay": {forgetDelay: time.Minute}, } + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + qb := newQueueBroker(0, testData.forgetDelay) + assert.NotNil(t, qb) + assert.NoError(t, isConsistent(qb)) - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - qb := newQueueBroker(0, tt.prioritizeQueryComponents, testData.forgetDelay) - assert.NotNil(t, qb) - assert.NoError(t, isConsistent(qb)) - - r := rand.New(rand.NewSource(time.Now().Unix())) - - // maps querier IDs to tenant indexes for that querier. - lastTenantIndexes := map[string]int{} - - // track active querier-worker connections to use worker IDs for removal - conns := map[string][]*QuerierWorkerConn{} - - for i := 0; i < 100; i++ { - switch r.Int() % 6 { - case 0: - err := getOrAddTenantQueue(qb, generateTenant(r), 3) - assert.Nil(t, err) - case 1: - querierID := generateQuerier(r) - tenantIndex := getNextTenantForQuerier(qb, lastTenantIndexes[querierID], querierID) - lastTenantIndexes[querierID] = tenantIndex - case 2: - qb.removeTenantQueue(generateTenant(r)) - case 3: - querierID := generateQuerier(r) - conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) - qb.addQuerierWorkerConn(conn) - conns[querierID] = append(conns[querierID], conn) - case 4: - querierID := generateQuerier(r) - if len(conns[querierID]) > 0 { - // does not matter which connection is removed; just choose the last one in the list - conn := conns[querierID][len(conns[querierID])-1] - qb.removeQuerierWorkerConn(conn, time.Now()) - // slice removed connection off end of tracking list - conns[querierID] = conns[querierID][:len(conns[querierID])-1] - } - case 5: - q := generateQuerier(r) - qb.notifyQuerierShutdown(q) - } - - assert.NoErrorf(t, isConsistent(qb), "last action %d, rand: %d", i, r.Int()%6) + r := rand.New(rand.NewSource(time.Now().Unix())) + + // maps querier IDs to tenant indexes for that querier. + lastTenantIndexes := map[string]int{} + + // track active querier-worker connections to use worker IDs for removal + conns := map[string][]*QuerierWorkerConn{} + + for i := 0; i < 100; i++ { + switch r.Int() % 6 { + case 0: + err := getOrAddTenantQueue(qb, generateTenant(r), 3) + assert.Nil(t, err) + case 1: + querierID := generateQuerier(r) + tenantIndex := getNextTenantForQuerier(qb, lastTenantIndexes[querierID], querierID) + lastTenantIndexes[querierID] = tenantIndex + case 2: + qb.removeTenantQueue(generateTenant(r)) + case 3: + querierID := generateQuerier(r) + conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) + qb.addQuerierWorkerConn(conn) + conns[querierID] = append(conns[querierID], conn) + case 4: + querierID := generateQuerier(r) + if len(conns[querierID]) > 0 { + // does not matter which connection is removed; just choose the last one in the list + conn := conns[querierID][len(conns[querierID])-1] + qb.removeQuerierWorkerConn(conn, time.Now()) + // slice removed connection off end of tracking list + conns[querierID] = conns[querierID][:len(conns[querierID])-1] } - }) + case 5: + q := generateQuerier(r) + qb.notifyQuerierShutdown(q) + } + + assert.NoErrorf(t, isConsistent(qb), "last action %d, rand: %d", i, r.Int()%6) } }) } @@ -553,109 +509,103 @@ func TestQueues_ForgetDelay(t *testing.T) { numTenants = 10 ) - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - now := time.Now() - qb := newQueueBroker(0, tt.prioritizeQueryComponents, forgetDelay) - assert.NotNil(t, qb) - assert.NoError(t, isConsistent(qb)) + now := time.Now() + qb := newQueueBroker(0, forgetDelay) + assert.NotNil(t, qb) + assert.NoError(t, isConsistent(qb)) + + // 3 queriers open 2 connections each. + querier1Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + qb.addQuerierWorkerConn(querier1Conn1) + querier1Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + qb.addQuerierWorkerConn(querier1Conn2) + + querier2Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + qb.addQuerierWorkerConn(querier2Conn1) + querier2Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + qb.addQuerierWorkerConn(querier2Conn2) + + querier3Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") + qb.addQuerierWorkerConn(querier3Conn1) + querier3Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") + qb.addQuerierWorkerConn(querier3Conn2) + + // Add tenant queues. + for i := 0; i < numTenants; i++ { + tenantID := fmt.Sprintf("tenant-%d", i) + err := qb.tenantQuerierAssignments.createOrUpdateTenant(tenantID, maxQueriersPerTenant) + assert.NoError(t, err) - // 3 queriers open 2 connections each. - querier1Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - qb.addQuerierWorkerConn(querier1Conn1) - querier1Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - qb.addQuerierWorkerConn(querier1Conn2) - - querier2Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - qb.addQuerierWorkerConn(querier2Conn1) - querier2Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - qb.addQuerierWorkerConn(querier2Conn2) - - querier3Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") - qb.addQuerierWorkerConn(querier3Conn1) - querier3Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") - qb.addQuerierWorkerConn(querier3Conn2) - - // Add tenant queues. - for i := 0; i < numTenants; i++ { - tenantID := fmt.Sprintf("tenant-%d", i) - err := qb.tenantQuerierAssignments.createOrUpdateTenant(tenantID, maxQueriersPerTenant) - assert.NoError(t, err) - - //Enqueue some stuff so that the tree queue node exists - err = qb.enqueueObjectsForTests(tenantID, 1) - assert.NoError(t, err) - } + //Enqueue some stuff so that the tree queue node exists + err = qb.enqueueObjectsForTests(tenantID, 1) + assert.NoError(t, err) + } - // We expect querier-1 to have some tenants. - querier1Tenants := getTenantsByQuerier(qb, "querier-1") - require.NotEmpty(t, querier1Tenants) + // We expect querier-1 to have some tenants. + querier1Tenants := getTenantsByQuerier(qb, "querier-1") + require.NotEmpty(t, querier1Tenants) - // Gracefully shutdown querier-1. - qb.removeQuerierWorkerConn(querier1Conn1, now.Add(20*time.Second)) - qb.removeQuerierWorkerConn(querier1Conn2, now.Add(21*time.Second)) - qb.notifyQuerierShutdown("querier-1") + // Gracefully shutdown querier-1. + qb.removeQuerierWorkerConn(querier1Conn1, now.Add(20*time.Second)) + qb.removeQuerierWorkerConn(querier1Conn2, now.Add(21*time.Second)) + qb.notifyQuerierShutdown("querier-1") - // We expect querier-1 has been removed. - assert.NotContains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + // We expect querier-1 has been removed. + assert.NotContains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - // We expect querier-1 tenants have been shuffled to other queriers. - for _, tenantID := range querier1Tenants { - assert.Contains(t, append(getTenantsByQuerier(qb, "querier-2"), getTenantsByQuerier(qb, "querier-3")...), tenantID) - } + // We expect querier-1 tenants have been shuffled to other queriers. + for _, tenantID := range querier1Tenants { + assert.Contains(t, append(getTenantsByQuerier(qb, "querier-2"), getTenantsByQuerier(qb, "querier-3")...), tenantID) + } - // Querier-1 reconnects. - qb.addQuerierWorkerConn(querier1Conn1) - qb.addQuerierWorkerConn(querier1Conn2) + // Querier-1 reconnects. + qb.addQuerierWorkerConn(querier1Conn1) + qb.addQuerierWorkerConn(querier1Conn2) - // We expect the initial querier-1 tenants have got back to querier-1. - for _, tenantID := range querier1Tenants { - assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) - } + // We expect the initial querier-1 tenants have got back to querier-1. + for _, tenantID := range querier1Tenants { + assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) + } - // Querier-1 abruptly terminates (no shutdown notification received). - qb.removeQuerierWorkerConn(querier1Conn1, now.Add(40*time.Second)) - qb.removeQuerierWorkerConn(querier1Conn2, now.Add(41*time.Second)) + // Querier-1 abruptly terminates (no shutdown notification received). + qb.removeQuerierWorkerConn(querier1Conn1, now.Add(40*time.Second)) + qb.removeQuerierWorkerConn(querier1Conn2, now.Add(41*time.Second)) - // We expect querier-1 has NOT been removed. - assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + // We expect querier-1 has NOT been removed. + assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - // We expect the querier-1 tenants have not been shuffled to other queriers. - for _, tenantID := range querier1Tenants { - assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) - } + // We expect the querier-1 tenants have not been shuffled to other queriers. + for _, tenantID := range querier1Tenants { + assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) + } - // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. - qb.forgetDisconnectedQueriers(now.Add(90 * time.Second)) + // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. + qb.forgetDisconnectedQueriers(now.Add(90 * time.Second)) - assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - for _, tenantID := range querier1Tenants { - assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) - } + for _, tenantID := range querier1Tenants { + assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) + } - // Try to forget disconnected queriers. This time querier-1 forget delay has passed. - qb.forgetDisconnectedQueriers(now.Add(105 * time.Second)) + // Try to forget disconnected queriers. This time querier-1 forget delay has passed. + qb.forgetDisconnectedQueriers(now.Add(105 * time.Second)) - assert.NotContains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + assert.NotContains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - // We expect querier-1 tenants have been shuffled to other queriers. - for _, tenantID := range querier1Tenants { - assert.Contains(t, append(getTenantsByQuerier(qb, "querier-2"), getTenantsByQuerier(qb, "querier-3")...), tenantID) - } - }) + // We expect querier-1 tenants have been shuffled to other queriers. + for _, tenantID := range querier1Tenants { + assert.Contains(t, append(getTenantsByQuerier(qb, "querier-2"), getTenantsByQuerier(qb, "querier-3")...), tenantID) } } @@ -666,91 +616,84 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget numTenants = 100 ) - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - now := time.Now() - qb := newQueueBroker(0, tt.prioritizeQueryComponents, forgetDelay) - assert.NotNil(t, qb) - assert.NoError(t, isConsistent(qb)) - - // 3 queriers open 2 connections each. - querier1Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - qb.addQuerierWorkerConn(querier1Conn1) - querier1Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - qb.addQuerierWorkerConn(querier1Conn2) - - querier2Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - qb.addQuerierWorkerConn(querier2Conn1) - querier2Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - qb.addQuerierWorkerConn(querier2Conn2) - - querier3Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") - qb.addQuerierWorkerConn(querier3Conn1) - querier3Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") - qb.addQuerierWorkerConn(querier3Conn2) - - // Add tenant queues. - for i := 0; i < numTenants; i++ { - tenantID := fmt.Sprintf("tenant-%d", i) - err := qb.tenantQuerierAssignments.createOrUpdateTenant(tenantID, maxQueriersPerTenant) - assert.NoError(t, err) - - //Enqueue some stuff so that the tree queue node exists - err = qb.enqueueObjectsForTests(tenantID, 1) - assert.NoError(t, err) - } + now := time.Now() + qb := newQueueBroker(0, forgetDelay) + assert.NotNil(t, qb) + assert.NoError(t, isConsistent(qb)) + + // 3 queriers open 2 connections each. + querier1Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + qb.addQuerierWorkerConn(querier1Conn1) + querier1Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + qb.addQuerierWorkerConn(querier1Conn2) + + querier2Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + qb.addQuerierWorkerConn(querier2Conn1) + querier2Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + qb.addQuerierWorkerConn(querier2Conn2) + + querier3Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") + qb.addQuerierWorkerConn(querier3Conn1) + querier3Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") + qb.addQuerierWorkerConn(querier3Conn2) + + // Add tenant queues. + for i := 0; i < numTenants; i++ { + tenantID := fmt.Sprintf("tenant-%d", i) + err := qb.tenantQuerierAssignments.createOrUpdateTenant(tenantID, maxQueriersPerTenant) + assert.NoError(t, err) - // We expect querier-1 to have some tenants. - querier1Tenants := getTenantsByQuerier(qb, "querier-1") - require.NotEmpty(t, querier1Tenants) + //Enqueue some stuff so that the tree queue node exists + err = qb.enqueueObjectsForTests(tenantID, 1) + assert.NoError(t, err) + } - // Querier-1 abruptly terminates (no shutdown notification received). - qb.removeQuerierWorkerConn(querier1Conn1, now.Add(40*time.Second)) - qb.removeQuerierWorkerConn(querier1Conn2, now.Add(41*time.Second)) + // We expect querier-1 to have some tenants. + querier1Tenants := getTenantsByQuerier(qb, "querier-1") + require.NotEmpty(t, querier1Tenants) - // We expect querier-1 has NOT been removed. - assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + // Querier-1 abruptly terminates (no shutdown notification received). + qb.removeQuerierWorkerConn(querier1Conn1, now.Add(40*time.Second)) + qb.removeQuerierWorkerConn(querier1Conn2, now.Add(41*time.Second)) - // We expect the querier-1 tenants have not been shuffled to other queriers. - for _, tenantID := range querier1Tenants { - assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) - } + // We expect querier-1 has NOT been removed. + assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. - qb.forgetDisconnectedQueriers(now.Add(90 * time.Second)) + // We expect the querier-1 tenants have not been shuffled to other queriers. + for _, tenantID := range querier1Tenants { + assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) + } - // Querier-1 reconnects. - qb.addQuerierWorkerConn(querier1Conn1) - qb.addQuerierWorkerConn(querier1Conn2) + // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. + qb.forgetDisconnectedQueriers(now.Add(90 * time.Second)) - assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + // Querier-1 reconnects. + qb.addQuerierWorkerConn(querier1Conn1) + qb.addQuerierWorkerConn(querier1Conn2) - // We expect the querier-1 tenants have not been shuffled to other queriers. - for _, tenantID := range querier1Tenants { - assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) - } + assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - // Try to forget disconnected queriers far in the future, but there's no disconnected querier. - qb.forgetDisconnectedQueriers(now.Add(200 * time.Second)) + // We expect the querier-1 tenants have not been shuffled to other queriers. + for _, tenantID := range querier1Tenants { + assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) + } - assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") - assert.NoError(t, isConsistent(qb)) + // Try to forget disconnected queriers far in the future, but there's no disconnected querier. + qb.forgetDisconnectedQueriers(now.Add(200 * time.Second)) - for _, tenantID := range querier1Tenants { - assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) - assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) - } + assert.Contains(t, qb.querierConnections.queriersByID, "querier-1") + assert.NoError(t, isConsistent(qb)) - }) + for _, tenantID := range querier1Tenants { + assert.Contains(t, getTenantsByQuerier(qb, "querier-1"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-2"), tenantID) + assert.NotContains(t, getTenantsByQuerier(qb, "querier-3"), tenantID) } } @@ -801,10 +744,7 @@ func (qb *queueBroker) removeTenantQueue(tenantID string) bool { } func (qb *queueBroker) makeQueuePathForTests(tenantID string) tree.QueuePath { - if qb.prioritizeQueryComponents { - return tree.QueuePath{unknownQueueDimension, tenantID} - } - return tree.QueuePath{tenantID} + return tree.QueuePath{unknownQueueDimension, tenantID} } func isConsistent(qb *queueBroker) error { diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 6a13d882a7b..4d348f543ab 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -27,20 +27,6 @@ import ( util_test "github.com/grafana/mimir/pkg/util/test" ) -// buildTreeTestsStruct returns all _allowed_ combinations of config flags for testing. -func buildTreeTestsStruct() []struct { - name string - prioritizeQueryComponents bool -} { - return []struct { - name string - prioritizeQueryComponents bool - }{ - {"prioritize query components disabled", false}, - {"prioritize query components enabled", true}, - } -} - func TestMain(m *testing.M) { util_test.VerifyNoLeakTestMain(m) } @@ -92,68 +78,60 @@ func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) * } func BenchmarkConcurrentQueueOperations(b *testing.B) { - treeTypes := buildTreeTestsStruct() - - for _, t := range treeTypes { - b.Run(t.name, func(b *testing.B) { - maxQueriersPerTenant := 0 // disable shuffle sharding - forgetQuerierDelay := time.Duration(0) - maxOutstandingRequestsPerTenant := 100 - - for _, numTenants := range []int{1, 10, 1000} { - b.Run(fmt.Sprintf("%v tenants", numTenants), func(b *testing.B) { - - // Query-frontends run 5 parallel streams per scheduler by default, - // and we typically see 2-5 frontends running at any one time. - for _, numProducers := range []int{10, 25} { - b.Run(fmt.Sprintf("%v concurrent producers", numProducers), func(b *testing.B) { - - // Queriers run with parallelism of 16 when query sharding is enabled. - for _, numConsumers := range []int{16, 160, 1600} { - b.Run(fmt.Sprintf("%v concurrent consumers", numConsumers), func(b *testing.B) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - maxOutstandingRequestsPerTenant, - t.prioritizeQueryComponents, - forgetQuerierDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(b, err) - - ctx := context.Background() - - require.NoError(b, queue.starting(ctx)) - b.Cleanup(func() { - require.NoError(b, queue.stop(nil)) - }) - - startProducersChan, producersErrGroup := makeQueueProducerGroup( - queue, maxQueriersPerTenant, b.N, numProducers, numTenants, randAdditionalQueueDimension, - ) - - queueConsumerErrGroup, startConsumersChan := makeQueueConsumerGroup( - context.Background(), queue, b.N, numConsumers, 1, nil, - ) - - b.ResetTimer() - close(startProducersChan) // run producers - close(startConsumersChan) // run consumers - - err = producersErrGroup.Wait() - require.NoError(b, err) - - err = queueConsumerErrGroup.Wait() - require.NoError(b, err) - }) - } + maxQueriersPerTenant := 0 // disable shuffle sharding + forgetQuerierDelay := time.Duration(0) + maxOutstandingRequestsPerTenant := 100 + + for _, numTenants := range []int{1, 10, 1000} { + b.Run(fmt.Sprintf("%v tenants", numTenants), func(b *testing.B) { + + // Query-frontends run 5 parallel streams per scheduler by default, + // and we typically see 2-5 frontends running at any one time. + for _, numProducers := range []int{10, 25} { + b.Run(fmt.Sprintf("%v concurrent producers", numProducers), func(b *testing.B) { + + // Queriers run with parallelism of 16 when query sharding is enabled. + for _, numConsumers := range []int{16, 160, 1600} { + b.Run(fmt.Sprintf("%v concurrent consumers", numConsumers), func(b *testing.B) { + queue, err := NewRequestQueue( + log.NewNopLogger(), + maxOutstandingRequestsPerTenant, + forgetQuerierDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(b, err) + + ctx := context.Background() + + require.NoError(b, queue.starting(ctx)) + b.Cleanup(func() { + require.NoError(b, queue.stop(nil)) + }) + + startProducersChan, producersErrGroup := makeQueueProducerGroup( + queue, maxQueriersPerTenant, b.N, numProducers, numTenants, randAdditionalQueueDimension, + ) + + queueConsumerErrGroup, startConsumersChan := makeQueueConsumerGroup( + context.Background(), queue, b.N, numConsumers, 1, nil, + ) + + b.ResetTimer() + close(startProducersChan) // run producers + close(startConsumersChan) // run consumers + + err = producersErrGroup.Wait() + require.NoError(b, err) + + err = queueConsumerErrGroup.Wait() + require.NoError(b, err) }) } }) } - }) } } @@ -354,443 +332,394 @@ func queueConsume( func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T) { const forgetDelay = 3 * time.Second - treeTypes := buildTreeTestsStruct() - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - 1, - tt.prioritizeQueryComponents, - forgetDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - // start the queue service. - ctx := context.Background() - require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) - - t.Cleanup(func() { - // we must send a shutdown signal for any remaining connected queriers - // or else StopAndAwaitTerminated will never complete. - queue.SubmitNotifyQuerierShutdown(ctx, "querier-1") - queue.SubmitNotifyQuerierShutdown(ctx, "querier-2") - require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) - }) - - // 2 queriers open 3 connections each. - querier1Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn1)) - require.Equal(t, 0, querier1Conn1.WorkerID) - require.Equal(t, 1, int(queue.connectedQuerierWorkers.Load())) - - querier1Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn2)) - require.Equal(t, 1, querier1Conn2.WorkerID) - require.Equal(t, 2, int(queue.connectedQuerierWorkers.Load())) - - querier1Conn3 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn3)) - require.Equal(t, 2, querier1Conn3.WorkerID) - require.Equal(t, 3, int(queue.connectedQuerierWorkers.Load())) - - querier2Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn1)) - require.Equal(t, 0, querier2Conn1.WorkerID) - require.Equal(t, 4, int(queue.connectedQuerierWorkers.Load())) - - querier2Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn2)) - require.Equal(t, 1, querier2Conn2.WorkerID) - require.Equal(t, 5, int(queue.connectedQuerierWorkers.Load())) - - querier2Conn3 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn3)) - require.Equal(t, 2, querier2Conn3.WorkerID) - require.Equal(t, 6, int(queue.connectedQuerierWorkers.Load())) - - // if querier-worker disconnects and reconnects before any other querier-worker changes, - // the querier-worker connect will get its same worker ID back - queue.SubmitUnregisterQuerierWorkerConn(querier2Conn2) - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn2)) - require.Equal(t, 1, querier2Conn2.WorkerID) - require.Equal(t, 6, int(queue.connectedQuerierWorkers.Load())) - - // if a querier-worker disconnects and another querier-worker connects before the first reconnects - // the second querier-worker will have taken the worker ID of the first querier-worker, - // and the first querier-worker will get issued a new worker ID - - // even though some operations are awaited - // and some are just submitted without waiting for completion, - // all querier-worker operations are processed in the order of the submit/await calls. - queue.SubmitUnregisterQuerierWorkerConn(querier1Conn2) - // we cannot be sure the worker ID is unregistered yet, - // but once we await the next worker register call, we can be sure. - querier1Conn4 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn4)) - require.False(t, querier1Conn2.IsRegistered()) - require.Equal(t, 1, querier1Conn4.WorkerID) - require.Equal(t, 6, int(queue.connectedQuerierWorkers.Load())) - // re-connect from the first querier-worker and get a completely new worker ID - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn2)) - require.Equal(t, 3, querier1Conn2.WorkerID) - require.Equal(t, 7, int(queue.connectedQuerierWorkers.Load())) - }) - } + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) + + // start the queue service. + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + t.Cleanup(func() { + // we must send a shutdown signal for any remaining connected queriers + // or else StopAndAwaitTerminated will never complete. + queue.SubmitNotifyQuerierShutdown(ctx, "querier-1") + queue.SubmitNotifyQuerierShutdown(ctx, "querier-2") + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + + // 2 queriers open 3 connections each. + querier1Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn1)) + require.Equal(t, 0, querier1Conn1.WorkerID) + require.Equal(t, 1, int(queue.connectedQuerierWorkers.Load())) + + querier1Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn2)) + require.Equal(t, 1, querier1Conn2.WorkerID) + require.Equal(t, 2, int(queue.connectedQuerierWorkers.Load())) + + querier1Conn3 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn3)) + require.Equal(t, 2, querier1Conn3.WorkerID) + require.Equal(t, 3, int(queue.connectedQuerierWorkers.Load())) + + querier2Conn1 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn1)) + require.Equal(t, 0, querier2Conn1.WorkerID) + require.Equal(t, 4, int(queue.connectedQuerierWorkers.Load())) + + querier2Conn2 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn2)) + require.Equal(t, 1, querier2Conn2.WorkerID) + require.Equal(t, 5, int(queue.connectedQuerierWorkers.Load())) + + querier2Conn3 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn3)) + require.Equal(t, 2, querier2Conn3.WorkerID) + require.Equal(t, 6, int(queue.connectedQuerierWorkers.Load())) + + // if querier-worker disconnects and reconnects before any other querier-worker changes, + // the querier-worker connect will get its same worker ID back + queue.SubmitUnregisterQuerierWorkerConn(querier2Conn2) + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn2)) + require.Equal(t, 1, querier2Conn2.WorkerID) + require.Equal(t, 6, int(queue.connectedQuerierWorkers.Load())) + + // if a querier-worker disconnects and another querier-worker connects before the first reconnects + // the second querier-worker will have taken the worker ID of the first querier-worker, + // and the first querier-worker will get issued a new worker ID + + // even though some operations are awaited + // and some are just submitted without waiting for completion, + // all querier-worker operations are processed in the order of the submit/await calls. + queue.SubmitUnregisterQuerierWorkerConn(querier1Conn2) + // we cannot be sure the worker ID is unregistered yet, + // but once we await the next worker register call, we can be sure. + querier1Conn4 := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn4)) + require.False(t, querier1Conn2.IsRegistered()) + require.Equal(t, 1, querier1Conn4.WorkerID) + require.Equal(t, 6, int(queue.connectedQuerierWorkers.Load())) + // re-connect from the first querier-worker and get a completely new worker ID + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn2)) + require.Equal(t, 3, querier1Conn2.WorkerID) + require.Equal(t, 7, int(queue.connectedQuerierWorkers.Load())) } func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) { const forgetDelay = 3 * time.Second const testTimeout = 10 * time.Second - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - 1, - tt.prioritizeQueryComponents, - forgetDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - // Start the queue service. - ctx := context.Background() - require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) - - // Two queriers connect. - querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) - querier2Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn)) - - t.Cleanup(func() { - // if the test has failed and the queue does not get cleared, - // we must send a shutdown signal for the remaining connected querier - // or else StopAndAwaitTerminated will never complete. - queue.SubmitUnregisterQuerierWorkerConn(querier2Conn) - require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) - }) - - // Querier-2 waits for a new request. - querier2wg := sync.WaitGroup{} - querier2wg.Add(1) - go func() { - defer querier2wg.Done() - dequeueReq := NewQuerierWorkerDequeueRequest(querier2Conn, FirstTenant()) - _, _, err := queue.AwaitRequestForQuerier(dequeueReq) - require.NoError(t, err) - }() - - // Querier-1 crashes (no graceful shutdown notification). - queue.SubmitUnregisterQuerierWorkerConn(querier1Conn) - - // Enqueue a request from an user which would be assigned to querier-1. - // NOTE: "user-1" shuffle shard always chooses the first querier ("querier-1" in this case) - // when there are only one or two queriers in the sorted list of connected queriers - req := &SchedulerRequest{ - Ctx: context.Background(), - Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, - AdditionalQueueDimensions: randAdditionalQueueDimension(""), - } - require.NoError(t, queue.SubmitRequestToEnqueue("user-1", req, 1, nil)) - - startTime := time.Now() - done := make(chan struct{}) - go func() { - querier2wg.Wait() - close(done) - }() - - select { - case <-done: - waitTime := time.Since(startTime) - // We expect that querier-2 got the request only after forget delay is passed. - assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) - case <-time.After(testTimeout): - t.Fatal("timeout: querier-2 did not receive the request expected to be resharded to querier-2") - } - - }) + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) + + // Start the queue service. + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + // Two queriers connect. + querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) + querier2Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn)) + + t.Cleanup(func() { + // if the test has failed and the queue does not get cleared, + // we must send a shutdown signal for the remaining connected querier + // or else StopAndAwaitTerminated will never complete. + queue.SubmitUnregisterQuerierWorkerConn(querier2Conn) + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + + // Querier-2 waits for a new request. + querier2wg := sync.WaitGroup{} + querier2wg.Add(1) + go func() { + defer querier2wg.Done() + dequeueReq := NewQuerierWorkerDequeueRequest(querier2Conn, FirstTenant()) + _, _, err := queue.AwaitRequestForQuerier(dequeueReq) + require.NoError(t, err) + }() + + // Querier-1 crashes (no graceful shutdown notification). + queue.SubmitUnregisterQuerierWorkerConn(querier1Conn) + + // Enqueue a request from an user which would be assigned to querier-1. + // NOTE: "user-1" shuffle shard always chooses the first querier ("querier-1" in this case) + // when there are only one or two queriers in the sorted list of connected queriers + req := &SchedulerRequest{ + Ctx: context.Background(), + Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, + AdditionalQueueDimensions: randAdditionalQueueDimension(""), + } + require.NoError(t, queue.SubmitRequestToEnqueue("user-1", req, 1, nil)) + + startTime := time.Now() + done := make(chan struct{}) + go func() { + querier2wg.Wait() + close(done) + }() + + select { + case <-done: + waitTime := time.Since(startTime) + // We expect that querier-2 got the request only after forget delay is passed. + assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) + case <-time.After(testTimeout): + t.Fatal("timeout: querier-2 did not receive the request expected to be resharded to querier-2") } } func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultipleQuerierForget(t *testing.T) { const forgetDelay = 3 * time.Second const testTimeout = 10 * time.Second - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - 1, - tt.prioritizeQueryComponents, - forgetDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - // Start the queue service. - ctx := context.Background() - require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) - - // Three queriers connect. - // We will submit the enqueue request with maxQueriers: 2. - // - // Whenever forgetDisconnectedQueriers runs, all queriers which reached zero connections since the last - // run of forgetDisconnectedQueriers will all be removed in from the shuffle shard in the same run. - // - // In this case two queriers are forgotten in the same run, but only the first forgotten querier triggers a reshard. - // In the first reshard, the tenant goes from a shuffled subset of queriers to a state of - // "tenant can use all queriers", as connected queriers is now <= tenant.maxQueriers. - // The second forgotten querier won't trigger a reshard, as connected queriers is already <= tenant.maxQueriers. - // - // We are testing that the occurrence of a reshard is reported correctly - // when not all querier forget operations in a single run of forgetDisconnectedQueriers caused a reshard. - // Two queriers connect. - querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) - querier2Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn)) - querier3Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier3Conn)) - - t.Cleanup(func() { - // if the test has failed and the queue does not get cleared, - // we must send a shutdown signal for the remaining connected querier - // or else StopAndAwaitTerminated will never complete. - queue.SubmitUnregisterQuerierWorkerConn(querier2Conn) - require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) - }) - - // querier-2 waits for a new request. - querier2wg := sync.WaitGroup{} - querier2wg.Add(1) - go func() { - defer querier2wg.Done() - dequeueReq := NewQuerierWorkerDequeueRequest(querier2Conn, FirstTenant()) - _, _, err := queue.AwaitRequestForQuerier(dequeueReq) - require.NoError(t, err) - }() - - // querier-1 and querier-3 crash (no graceful shutdown notification). - queue.SubmitUnregisterQuerierWorkerConn(querier1Conn) - queue.SubmitUnregisterQuerierWorkerConn(querier3Conn) - - // Enqueue a request from a tenant which would be assigned to querier-1. - // NOTE: "user-1" shuffle shard always chooses the first querier ("querier-1" in this case) - // when there are only one or two queriers in the sorted list of connected queriers - req := &SchedulerRequest{ - Ctx: context.Background(), - Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, - AdditionalQueueDimensions: randAdditionalQueueDimension(""), - } - require.NoError(t, queue.SubmitRequestToEnqueue("user-1", req, 2, nil)) - - startTime := time.Now() - done := make(chan struct{}) - go func() { - querier2wg.Wait() - close(done) - }() - select { - case <-done: - waitTime := time.Since(startTime) - // We expect that querier-2 got the request only after forget delay is passed. - assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) - case <-time.After(testTimeout): - t.Fatal("timeout: querier-2 did not receive the request expected to be resharded to querier-2") - } - - }) + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) + + // Start the queue service. + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + // Three queriers connect. + // We will submit the enqueue request with maxQueriers: 2. + // + // Whenever forgetDisconnectedQueriers runs, all queriers which reached zero connections since the last + // run of forgetDisconnectedQueriers will all be removed in from the shuffle shard in the same run. + // + // In this case two queriers are forgotten in the same run, but only the first forgotten querier triggers a reshard. + // In the first reshard, the tenant goes from a shuffled subset of queriers to a state of + // "tenant can use all queriers", as connected queriers is now <= tenant.maxQueriers. + // The second forgotten querier won't trigger a reshard, as connected queriers is already <= tenant.maxQueriers. + // + // We are testing that the occurrence of a reshard is reported correctly + // when not all querier forget operations in a single run of forgetDisconnectedQueriers caused a reshard. + // Two queriers connect. + querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-1") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) + querier2Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-2") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier2Conn)) + querier3Conn := NewUnregisteredQuerierWorkerConn(context.Background(), "querier-3") + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier3Conn)) + + t.Cleanup(func() { + // if the test has failed and the queue does not get cleared, + // we must send a shutdown signal for the remaining connected querier + // or else StopAndAwaitTerminated will never complete. + queue.SubmitUnregisterQuerierWorkerConn(querier2Conn) + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + + // querier-2 waits for a new request. + querier2wg := sync.WaitGroup{} + querier2wg.Add(1) + go func() { + defer querier2wg.Done() + dequeueReq := NewQuerierWorkerDequeueRequest(querier2Conn, FirstTenant()) + _, _, err := queue.AwaitRequestForQuerier(dequeueReq) + require.NoError(t, err) + }() + + // querier-1 and querier-3 crash (no graceful shutdown notification). + queue.SubmitUnregisterQuerierWorkerConn(querier1Conn) + queue.SubmitUnregisterQuerierWorkerConn(querier3Conn) + + // Enqueue a request from a tenant which would be assigned to querier-1. + // NOTE: "user-1" shuffle shard always chooses the first querier ("querier-1" in this case) + // when there are only one or two queriers in the sorted list of connected queriers + req := &SchedulerRequest{ + Ctx: context.Background(), + Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, + AdditionalQueueDimensions: randAdditionalQueueDimension(""), + } + require.NoError(t, queue.SubmitRequestToEnqueue("user-1", req, 2, nil)) + + startTime := time.Now() + done := make(chan struct{}) + go func() { + querier2wg.Wait() + close(done) + }() + + select { + case <-done: + waitTime := time.Since(startTime) + // We expect that querier-2 got the request only after forget delay is passed. + assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) + case <-time.After(testTimeout): + t.Fatal("timeout: querier-2 did not receive the request expected to be resharded to querier-2") } } func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled(t *testing.T) { const forgetDelay = 3 * time.Second const querierID = "querier-1" - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - 1, - tt.prioritizeQueryComponents, - forgetDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - require.NoError(t, services.StartAndAwaitRunning(context.Background(), queue)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), queue)) - }) - querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) - - // Calling AwaitRequestForQuerier with a context that is already cancelled should fail immediately. - deadCtx, cancel := context.WithCancel(context.Background()) - cancel() - querier1Conn.ctx = deadCtx - - dequeueReq := NewQuerierWorkerDequeueRequest(querier1Conn, FirstTenant()) - r, tenant, err := queue.AwaitRequestForQuerier(dequeueReq) - assert.Nil(t, r) - assert.Equal(t, FirstTenant(), tenant) - assert.ErrorIs(t, err, context.Canceled) - - // Further, a context canceled after AwaitRequestForQuerier publishes a request should also fail. - errChan := make(chan error) - ctx, cancel := context.WithCancel(context.Background()) - querier1Conn.ctx = ctx - - go func() { - dequeueReq := NewQuerierWorkerDequeueRequest(querier1Conn, FirstTenant()) - _, _, err := queue.AwaitRequestForQuerier(dequeueReq) - errChan <- err - }() - - time.Sleep(20 * time.Millisecond) // Wait for AwaitRequestForQuerier to be waiting for a query. - cancel() - - select { - case err := <-errChan: - require.Equal(t, context.Canceled, err) - case <-time.After(time.Second): - require.Fail(t, "gave up waiting for GetNextRequestForQuerierToReturn") - } - - }) + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), queue)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), queue)) + }) + + querier1Conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querier1Conn)) + + // Calling AwaitRequestForQuerier with a context that is already cancelled should fail immediately. + deadCtx, cancel := context.WithCancel(context.Background()) + cancel() + querier1Conn.ctx = deadCtx + + dequeueReq := NewQuerierWorkerDequeueRequest(querier1Conn, FirstTenant()) + r, tenant, err := queue.AwaitRequestForQuerier(dequeueReq) + assert.Nil(t, r) + assert.Equal(t, FirstTenant(), tenant) + assert.ErrorIs(t, err, context.Canceled) + + // Further, a context canceled after AwaitRequestForQuerier publishes a request should also fail. + errChan := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + querier1Conn.ctx = ctx + + go func() { + dequeueReq := NewQuerierWorkerDequeueRequest(querier1Conn, FirstTenant()) + _, _, err := queue.AwaitRequestForQuerier(dequeueReq) + errChan <- err + }() + + time.Sleep(20 * time.Millisecond) // Wait for AwaitRequestForQuerier to be waiting for a query. + cancel() + + select { + case err := <-errChan: + require.Equal(t, context.Canceled, err) + case <-time.After(time.Second): + require.Fail(t, "gave up waiting for GetNextRequestForQuerierToReturn") } - } func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierIsAlreadyShuttingDown(t *testing.T) { const forgetDelay = 3 * time.Second const querierID = "querier-1" - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - 1, - tt.prioritizeQueryComponents, - forgetDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - ctx := context.Background() - require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) - }) + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) - querierConn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) - require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querierConn)) + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) - queue.SubmitNotifyQuerierShutdown(ctx, querierID) + querierConn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(querierConn)) - dequeueReq := NewQuerierWorkerDequeueRequest(querierConn, FirstTenant()) - _, _, err = queue.AwaitRequestForQuerier(dequeueReq) - require.EqualError(t, err, "querier has informed the scheduler it is shutting down") - }) - } + queue.SubmitNotifyQuerierShutdown(ctx, querierID) + dequeueReq := NewQuerierWorkerDequeueRequest(querierConn, FirstTenant()) + _, _, err = queue.AwaitRequestForQuerier(dequeueReq) + require.EqualError(t, err, "querier has informed the scheduler it is shutting down") } func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSendToQuerier(t *testing.T) { const forgetDelay = 3 * time.Second const querierID = "querier-1" - treeTypes := buildTreeTestsStruct() - for _, tt := range treeTypes { - t.Run(tt.name, func(t *testing.T) { - queue, err := NewRequestQueue( - log.NewNopLogger(), - 1, - tt.prioritizeQueryComponents, - forgetDelay, - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), - ) - require.NoError(t, err) - - // bypassing queue dispatcher loop for direct usage of the queueBroker and - // passing a QuerierWorkerDequeueRequest for a canceled querier connection - qb := newQueueBroker(queue.maxOutstandingPerTenant, tt.prioritizeQueryComponents, queue.forgetDelay) - qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID)) - - tenantMaxQueriers := 0 // no sharding - queueDim := randAdditionalQueueDimension("") - req := &SchedulerRequest{ - Ctx: context.Background(), - Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, - AdditionalQueueDimensions: queueDim, - } - tr := tenantRequest{ - tenantID: "tenant-1", - req: req, - } - - var multiAlgorithmTreeQueuePath tree.QueuePath - if queueDim == nil { - queueDim = []string{unknownQueueDimension} - } - if qb.prioritizeQueryComponents { - multiAlgorithmTreeQueuePath = append(append(multiAlgorithmTreeQueuePath, queueDim...), "tenant-1") - } else { - multiAlgorithmTreeQueuePath = append([]string{"tenant-1"}, queueDim...) - } - - require.Nil(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath)) - require.NoError(t, qb.enqueueRequestBack(&tr, tenantMaxQueriers)) - require.False(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) - - ctx, cancel := context.WithCancel(context.Background()) - call := &QuerierWorkerDequeueRequest{ - QuerierWorkerConn: &QuerierWorkerConn{ - ctx: ctx, - QuerierID: querierID, - WorkerID: 0, - }, - lastTenantIndex: FirstTenant(), - recvChan: make(chan querierWorkerDequeueResponse), - } - cancel() // ensure querier context done before send is attempted - - // send to querier will fail but method returns true, - // indicating not to re-submit a request for QuerierWorkerDequeueRequest for the querier - require.True(t, queue.trySendNextRequestForQuerier(call)) - // assert request was re-enqueued for tenant after failed send - require.False(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + ) + require.NoError(t, err) + + // bypassing queue dispatcher loop for direct usage of the queueBroker and + // passing a QuerierWorkerDequeueRequest for a canceled querier connection + qb := newQueueBroker(queue.maxOutstandingPerTenant, queue.forgetDelay) + qb.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID)) + + tenantMaxQueriers := 0 // no sharding + queueDim := randAdditionalQueueDimension("") + req := &SchedulerRequest{ + Ctx: context.Background(), + Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, + AdditionalQueueDimensions: queueDim, + } + tr := tenantRequest{ + tenantID: "tenant-1", + req: req, + } - }) + var multiAlgorithmTreeQueuePath tree.QueuePath + if queueDim == nil { + queueDim = []string{unknownQueueDimension} + } + multiAlgorithmTreeQueuePath = append(append(multiAlgorithmTreeQueuePath, queueDim...), "tenant-1") + + require.Nil(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath)) + require.NoError(t, qb.enqueueRequestBack(&tr, tenantMaxQueriers)) + require.False(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) + + ctx, cancel := context.WithCancel(context.Background()) + call := &QuerierWorkerDequeueRequest{ + QuerierWorkerConn: &QuerierWorkerConn{ + ctx: ctx, + QuerierID: querierID, + WorkerID: 0, + }, + lastTenantIndex: FirstTenant(), + recvChan: make(chan querierWorkerDequeueResponse), } + cancel() // ensure querier context done before send is attempted + // send to querier will fail but method returns true, + // indicating not to re-submit a request for QuerierWorkerDequeueRequest for the querier + require.True(t, queue.trySendNextRequestForQuerier(call)) + // assert request was re-enqueued for tenant after failed send + require.False(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 47a1969b8b9..2aed911103f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -97,9 +97,8 @@ type connectedFrontend struct { } type Config struct { - MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"` - PrioritizeQueryComponents bool `yaml:"prioritize_query_components" category:"experimental"` - QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"` + MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"` + QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` ServiceDiscovery schedulerdiscovery.Config `yaml:",inline"` @@ -107,7 +106,6 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") - f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} @@ -164,7 +162,6 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.requestQueue, err = queue.NewRequestQueue( s.log, cfg.MaxOutstandingPerTenant, - cfg.PrioritizeQueryComponents, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests,