-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store-gateway: add timeout to query gate wait #7777
Changes from all commits
67026f0
87abd77
63adee8
52d2596
608b5a9
7a0d772
573ec0d
68a0024
ed7012c
f09508c
523f26a
28b00ff
375a044
413a415
df43e5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"github.com/thanos-io/objstore" | ||
"google.golang.org/grpc/metadata" | ||
|
||
"github.com/grafana/mimir/pkg/mimirpb" | ||
"github.com/grafana/mimir/pkg/storage/bucket" | ||
"github.com/grafana/mimir/pkg/storage/tsdb" | ||
"github.com/grafana/mimir/pkg/storage/tsdb/block" | ||
|
@@ -106,6 +107,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra | |
queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg) | ||
queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent) | ||
queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate) | ||
queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout} | ||
|
||
// The number of concurrent index header loads from storegateway are limited. | ||
lazyLoadingGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "index_header"}, gateReg) | ||
|
@@ -420,6 +422,40 @@ func (u *BucketStores) syncDirForUser(userID string) string { | |
return filepath.Join(u.cfg.BucketStore.SyncDir, userID) | ||
} | ||
|
||
// timeoutGate returns errGateTimeout when the timeout is reached while still waiting for the delegate gate. | ||
// timeoutGate belongs better in dskit. However, at the time of writing dskit supports go 1.20. | ||
// go 1.20 doesn't have context.WithTimeoutCause yet, | ||
// so we choose to implement timeoutGate here instead of implementing context.WithTimeoutCause ourselves in dskit. | ||
// It also allows to keep the span logger in timeoutGate as opposed to in the bucket store. | ||
type timeoutGate struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] Looks something we could have done in dskit. Non blocking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i also realized this after submitting the PR. But dskit doesn't have |
||
delegate gate.Gate | ||
timeout time.Duration | ||
} | ||
|
||
var errGateTimeout = staticError{cause: mimirpb.INSTANCE_LIMIT, msg: "timeout waiting for concurrency gate"} | ||
|
||
func (t timeoutGate) Start(ctx context.Context) error { | ||
if t.timeout == 0 { | ||
return t.delegate.Start(ctx) | ||
} | ||
|
||
// Inject our own error so that we can differentiate between a timeout caused by this gate | ||
// or a timeout in the original request timeout. | ||
ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout) | ||
defer cancel() | ||
|
||
err := t.delegate.Start(ctx) | ||
if errors.Is(context.Cause(ctx), errGateTimeout) { | ||
_ = spanlogger.FromContext(ctx, log.NewNopLogger()).Error(err) | ||
err = errGateTimeout | ||
} | ||
return err | ||
} | ||
|
||
func (t timeoutGate) Done() { | ||
t.delegate.Done() | ||
} | ||
|
||
func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { | ||
// Check if the store already exists. | ||
bs := u.getStore(userID) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't change the implementation, because that was the default behaviour anyways. But decided to add tests because the current behaviour is implicit rather than explicit.