Skip to content

Commit

Permalink
Mimir query engine: fix multiple org IDs present error when MQE is …
Browse files Browse the repository at this point in the history
…used to evaluate a cross-tenant query (#9120)

* Fix `multiple org IDs present` error when MQE is used to evaluate a cross-tenant query

* Add changelog entry
  • Loading branch information
charleskorn authored Aug 28, 2024
1 parent ce3b88a commit 9bad7a2
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* [CHANGE] Distributor: Replace `-distributor.retry-after-header.max-backoff-exponent` and `-distributor.retry-after-header.base-seconds` with `-distributor.retry-after-header.min-backoff` and `-distributor.retry-after-header.max-backoff` for easier configuration. #8694
* [CHANGE] Ingester: increase the default inactivity timeout of active series (`-ingester.active-series-metrics-idle-timeout`) from `10m` to `20m`. #8975
* [CHANGE] Distributor: Remove `-distributor.enable-otlp-metadata-storage` flag, which was deprecated in version 2.12. #9069
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9017 #9018 #9008
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9017 #9018 #9008 #9120
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
19 changes: 17 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,25 @@ type tenantQueryLimitsProvider struct {
}

func (p *tenantQueryLimitsProvider) GetMaxEstimatedMemoryConsumptionPerQuery(ctx context.Context) (uint64, error) {
tenantID, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return 0, err
}

return p.limits.MaxEstimatedMemoryConsumptionPerQuery(tenantID), nil
totalLimit := uint64(0)

for _, tenantID := range tenantIDs {
tenantLimit := p.limits.MaxEstimatedMemoryConsumptionPerQuery(tenantID)

if tenantLimit == 0 {
// If any tenant is unlimited, then treat whole query as unlimited.
return 0, nil
}

// Given we'll enforce limits like the max chunks limit on a per-tenant basis (and therefore effectively allow the
// query to consume the sum of all tenants' limits), emulate equivalent behaviour with the memory consumption limit.
totalLimit += tenantLimit
}

return totalLimit, nil
}
78 changes: 78 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,3 +1527,81 @@ func (m *mockBlocksStorageQuerier) LabelNames(ctx context.Context, hints *storag
func (m *mockBlocksStorageQuerier) Close() error {
return nil
}

func TestTenantQueryLimitsProvider(t *testing.T) {
tenantLimits := &staticTenantLimits{
limits: map[string]*validation.Limits{
"user-1": {
MaxEstimatedMemoryConsumptionPerQuery: 1000,
},
"user-2": {
MaxEstimatedMemoryConsumptionPerQuery: 10,
},
"user-3": {
MaxEstimatedMemoryConsumptionPerQuery: 3000,
},
"unlimited-user": {
MaxEstimatedMemoryConsumptionPerQuery: 0,
},
},
}

overrides, err := validation.NewOverrides(defaultLimitsConfig(), tenantLimits)
require.NoError(t, err)

provider := &tenantQueryLimitsProvider{
limits: overrides,
}

testCases := map[string]struct {
ctx context.Context
expectedLimit uint64
expectedError error
}{
"no tenant ID provided": {
ctx: context.Background(),
expectedError: user.ErrNoOrgID,
},
"single tenant ID provided, has limit": {
ctx: user.InjectOrgID(context.Background(), "user-1"),
expectedLimit: 1000,
},
"single tenant ID provided, unlimited": {
ctx: user.InjectOrgID(context.Background(), "unlimited-user"),
expectedLimit: 0,
},
"multiple tenant IDs provided, all have limits": {
ctx: user.InjectOrgID(context.Background(), "user-1|user-2|user-3"),
expectedLimit: 4010,
},
"multiple tenant IDs provided, one unlimited": {
ctx: user.InjectOrgID(context.Background(), "user-1|unlimited-user|user-3"),
expectedLimit: 0,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
actualLimit, actualErr := provider.GetMaxEstimatedMemoryConsumptionPerQuery(testCase.ctx)

if testCase.expectedError == nil {
require.NoError(t, actualErr)
require.Equal(t, testCase.expectedLimit, actualLimit)
} else {
require.ErrorIs(t, actualErr, testCase.expectedError)
}
})
}
}

type staticTenantLimits struct {
limits map[string]*validation.Limits
}

func (s *staticTenantLimits) ByUserID(userID string) *validation.Limits {
return s.limits[userID]
}

func (s *staticTenantLimits) AllByUserID() map[string]*validation.Limits {
return s.limits
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer

maxEstimatedMemoryConsumptionPerQuery, err := engine.limitsProvider.GetMaxEstimatedMemoryConsumptionPerQuery(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not get memory consumption limit for query: %w", err)
}

expr, err := parser.ParseExpr(qs)
Expand Down

0 comments on commit 9bad7a2

Please sign in to comment.