-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle rate limiting errors for GitHub API #2271
Conversation
op := func() error { | ||
// Check if the token is currently rate limited, block the goroutine if it is | ||
err := c.waitIfRateLimited(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
_, resp, err := c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{ | ||
Body: &comment, | ||
}) | ||
|
||
rateLimCheckErr := c.checkRateLimitResponse(ctx, resp.Response) | ||
if rateLimCheckErr != nil { | ||
return rateLimCheckErr | ||
} | ||
|
||
// If err is of type RateLimitError or AbuseRateLimitError, don't do permanent backoff | ||
if isRateLimitError(err) { | ||
return err | ||
} | ||
|
||
return backoffv4.Permanent(err) | ||
} | ||
return performWithRetry(ctx, op) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateComment
endpoint is modified as an example. If we agree on the implementation, other endpoints will be modified as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this pattern, but I wonder if we can make this more generic to avoid repeating the closure around all GH calls. Taking your CreateComment
example into account, could we do something like:
func (c *RestClient) CreateComment(ctx context.Context, owner, repo string, number int, comment string) error {
createCommentWrapper := func(ctx context.Context) (*github.Response, error) {
return c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{
Body: &comment,
})
}
return c.rateLimitHandler(ctx, createCommentWrapper)
}
and then extract the closure into a non-exported function:
type ghClientWrapper func(ctx context.Context) (*github.Response, error)
func (c *RestClient) rateLimitHandler(ctx context.Context, wrapper ghClientWrapper) error {
op := func() error {
// Check if the token is currently rate limited, block the goroutine if it is
err := c.waitIfRateLimited(ctx)
if err != nil {
return err
}
_, resp, err := apiCall(ctx)
if err != nil {
rateLimCheckErr := c.checkRateLimitResponse(ctx, resp.Response)
if rateLimCheckErr != nil {
return rateLimCheckErr
}
// If err is of type RateLimitError or AbuseRateLimitError, don't do permanent backoff
if isRateLimitError(err) {
return err
}
return backoffv4.Permanent(err)
}
return nil
}
return performWithRetry(ctx, op)
}
b433909
to
a0e7322
Compare
5e8812d
to
9d6b3bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started the review, I haven't finished reading all of the PR, so I'm submitting the comments I had in the meantime and will resume the review later today.
Great work!
internal/tokenstatus/token_status.go
Outdated
|
||
// Store stores the rate limit status of token | ||
type Store struct { | ||
status map[key]chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using xsync.MapOf
? We already use it in minder, so you wouldn't drag in a new dependency and you wouldn't have to take care of the mutex yourself. Also it provides methods like LoadAndDelete
which might be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using
xsync.MapOf
? We already use it in minder, so you wouldn't drag in a new dependency and you wouldn't have to take care of the mutex yourself.
We use v1.5.2
which is not supported now. And even v3
had a bug fix a few months back:
Fix too aggressive Map/MapOf shrinking on deletion leading to out of range panic
First of all, I'd suggest that we update the version as many bugs have been fixed since v1.5.2
. Secondly, I'm open to using xsync.MapOf
but I'm not too sure about the stability (no offence) given that a recent version fixed a out of range panic. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a great answer, I'm even surprised we were using such an old version. I don't see an issue with your implementation, I think the way you used Lock
/Rlock
etc is already correct and given that these are used in network traffic I'm not too concerned about lock congestion being a bottleneck.
So let's leave the code as-is and let me see about upgrading xsync.MapOf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if dependabot catches updates when a newer version is released i.e. yourcoolpackage/v3
.
internal/providers/github/github.go
Outdated
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-time.After(MaxRateLimitWait): | ||
return fmt.Errorf("rate limit wait exceeded for %s", c.GetOwner()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would be a good idea to create a timer manually and cancel it the wait finishes sooner than the timer fires to deallocate it?
timer := time.NewTimer(MaxRateLimitWait)
defer func() {
if !timer.Stop() {
<-timer.C
}
}()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should stop the timer with defer for better and faster gc. How about
func (c *RestClient) waitIfRateLimited(ctx context.Context) error {
blockedChan, ok := c.tokenStatStore.GetBlockingChan(c.token, db.ProviderTypeGithub)
if !ok {
return nil
}
timer := time.NewTimer(MaxRateLimitWait)
defer timer.Stop()
select {
case <-blockedChan:
return nil
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return fmt.Errorf("rate limit wait exceeded for %s", c.GetOwner())
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good!
internal/providers/github/github.go
Outdated
@@ -65,6 +74,7 @@ func NewRestClient( | |||
ctx context.Context, | |||
config *minderv1.GitHubProviderConfig, | |||
metrics telemetry.HttpClientMetrics, | |||
tokenStatStore *tokenstatus.Store, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading the rest of this module sounds like tokenstatus.Store
is now required. Is that intended? If yes, then should we error if the store is not provided?
Alternatively, I wonder if it would make sense to rather create a WithTokenStatus
option and handle its absence in the REST code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading the rest of this module sounds like tokenstatus.Store is now required. Is that intended?'
Yes, it is. I'll add a null check.
Alternatively, I wonder if it would make sense to rather create a WithTokenStatus option and handle its absence in the REST code.
Why would it be absent? The only case where I think it may be absent is when we don't want to handle rate limit errors. Is this something we should support? For me, if we don't want to be blocked, we can pass a context with a short timeout (in case we are blocked by rate limit)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest I was only wondering if we could decrease the number of places where we pass the tokenstatus.Store
by using something like the WithTokenStatus
option and then just have a passthrough implementation as a default opt to avoid increasing the number of required parameters across the codebase. There's no functional reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you I went through the rest of the code and left some comments. Mostly they are nits and I think this is great work.
op := func() error { | ||
// Check if the token is currently rate limited, block the goroutine if it is | ||
err := c.waitIfRateLimited(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
_, resp, err := c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{ | ||
Body: &comment, | ||
}) | ||
|
||
rateLimCheckErr := c.checkRateLimitResponse(ctx, resp.Response) | ||
if rateLimCheckErr != nil { | ||
return rateLimCheckErr | ||
} | ||
|
||
// If err is of type RateLimitError or AbuseRateLimitError, don't do permanent backoff | ||
if isRateLimitError(err) { | ||
return err | ||
} | ||
|
||
return backoffv4.Permanent(err) | ||
} | ||
return performWithRetry(ctx, op) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this pattern, but I wonder if we can make this more generic to avoid repeating the closure around all GH calls. Taking your CreateComment
example into account, could we do something like:
func (c *RestClient) CreateComment(ctx context.Context, owner, repo string, number int, comment string) error {
createCommentWrapper := func(ctx context.Context) (*github.Response, error) {
return c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{
Body: &comment,
})
}
return c.rateLimitHandler(ctx, createCommentWrapper)
}
and then extract the closure into a non-exported function:
type ghClientWrapper func(ctx context.Context) (*github.Response, error)
func (c *RestClient) rateLimitHandler(ctx context.Context, wrapper ghClientWrapper) error {
op := func() error {
// Check if the token is currently rate limited, block the goroutine if it is
err := c.waitIfRateLimited(ctx)
if err != nil {
return err
}
_, resp, err := apiCall(ctx)
if err != nil {
rateLimCheckErr := c.checkRateLimitResponse(ctx, resp.Response)
if rateLimCheckErr != nil {
return rateLimCheckErr
}
// If err is of type RateLimitError or AbuseRateLimitError, don't do permanent backoff
if isRateLimitError(err) {
return err
}
return backoffv4.Permanent(err)
}
return nil
}
return performWithRetry(ctx, op)
}
var abuseRateLimitError *github.AbuseRateLimitError | ||
isAbuseRateLimitErr := errors.As(err, &abuseRateLimitError) | ||
|
||
return isRateLimitErr || isAbuseRateLimitErr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just do:
func isRateLimitError(err error) bool {
var rateLimitError *github.RateLimitError
var abuseRateLimitError *github.AbuseRateLimitError
return errors.As(err, &rateLimitError) || errors.As(err, &abuseRateLimitError)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although on second though, the extra boolean flags might be useful for extra debugging (I'll add another comment with better locality about that)
internal/providers/github/github.go
Outdated
// Check if rate limit headers are present | ||
remaining, _ := strconv.Atoi(resp.Header.Get("x-ratelimit-remaining")) | ||
resetTimeUnix, _ := strconv.ParseInt(resp.Header.Get("x-ratelimit-reset"), 10, 64) | ||
retryAfter, _ := strconv.Atoi(resp.Header.Get("retry-after")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it might be useful to log these values at a Debug
level?
internal/providers/github/github.go
Outdated
return c.blockAndWaitWithTimeout(ctx, waitTime, MaxRateLimitWait) | ||
} | ||
|
||
logger.Warn().Msgf("Rate limit exceeded by %s. Waiting for 1 minute before retrying", c.GetOwner()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract 1*time.Minute
to a var or a const to make sure we don't forget updating the log message if we update the wait time?
internal/providers/github/github.go
Outdated
return nil | ||
} | ||
|
||
logger := zerolog.Ctx(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we extract the URL we tried to talk to to get a sense of what call was rate-limited and add that as a Str
for the logger?
internal/providers/github/github.go
Outdated
} | ||
|
||
func (c *RestClient) checkRateLimitResponse(ctx context.Context, resp *http.Response) error { | ||
if resp.StatusCode != http.StatusForbidden && resp.StatusCode != http.StatusTooManyRequests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there cases where resp
might be nil
on a fatal error in which case we'd have to just rely on the error rather then the reply? (I noticed that there's no err
check between calling c.client.Issues.CreateComment
and dereferencing resp
)
_, resp, err := c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{
Body: &comment,
})
rateLimCheckErr := c.checkRateLimitResponse(ctx, resp.Response)
if rateLimCheckErr != nil {
return rateLimCheckErr
}
I just noticed, that the go-github library internally also does something when it encounters rate-limiting errors (It starts sending mock responses stating you've been rate-limited). Let me check that out, and after that I'll address the review items. // BareDo sends an API request and lets you handle the api response. If an error
// or API Error occurs, the error will contain more information. Otherwise you
// are supposed to read and close the response's Body. If rate limit is exceeded
// and reset time is in the future, BareDo returns *RateLimitError immediately
// without making a network API call. |
@jhrozek So apparently, E.g., If a req to create a comment hits the rate limit, then it would add that rate limit to that particular category store. https://github.com/google/go-github/blob/975b2e55cec1dd1a438ed15ec605cc46113506ab/github/github.go#L1323-L1324
Here is some code from the lib to make more sense (https://github.com/google/go-github/blob/975b2e55cec1dd1a438ed15ec605cc46113506ab/github/github.go#L936-L990): // checkRateLimitBeforeDo does not make any network calls, but uses existing knowledge from
// current client state in order to quickly check if *RateLimitError can be immediately returned
// from Client.Do, and if so, returns it so that Client.Do can skip making a network API call unnecessarily.
// Otherwise it returns nil, and Client.Do should proceed normally.
func (c *Client) checkRateLimitBeforeDo(req *http.Request, rateLimitCategory rateLimitCategory) *RateLimitError {
c.rateMu.Lock()
rate := c.rateLimits[rateLimitCategory]
c.rateMu.Unlock()
if !rate.Reset.Time.IsZero() && rate.Remaining == 0 && time.Now().Before(rate.Reset.Time) {
// Create a fake response.
resp := &http.Response{
Status: http.StatusText(http.StatusForbidden),
StatusCode: http.StatusForbidden,
Request: req,
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader("")),
}
return &RateLimitError{
Rate: rate,
Response: resp,
Message: fmt.Sprintf("API rate limit of %v still exceeded until %v, not making remote request.", rate.Limit, rate.Reset.Time),
}
}
return nil
}
// checkSecondaryRateLimitBeforeDo does not make any network calls, but uses existing knowledge from
// current client state in order to quickly check if *AbuseRateLimitError can be immediately returned
// from Client.Do, and if so, returns it so that Client.Do can skip making a network API call unnecessarily.
// Otherwise it returns nil, and Client.Do should proceed normally.
func (c *Client) checkSecondaryRateLimitBeforeDo(req *http.Request) *AbuseRateLimitError {
c.rateMu.Lock()
secondary := c.secondaryRateLimitReset
c.rateMu.Unlock()
if !secondary.IsZero() && time.Now().Before(secondary) {
// Create a fake response.
resp := &http.Response{
Status: http.StatusText(http.StatusForbidden),
StatusCode: http.StatusForbidden,
Request: req,
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader("")),
}
retryAfter := time.Until(secondary)
return &AbuseRateLimitError{
Response: resp,
Message: fmt.Sprintf("API secondary rate limit exceeded until %v, not making remote request.", secondary),
RetryAfter: &retryAfter,
}
}
return nil
} It internally maintains when the client will be ready to make new requests. These were the features. Now the problem is that we create a new client for every request, making all that code useless. The approach we should be really following is caching the client and checking the cache for a client if it already exists. Wdyt about this approach? Also, if we add the client to the cache, we may want to use something more sophisticated than a map with Mutex. We cannot have a constantly growing cache. We would have to remove no longer valid tokens to avoid memory going boom. (Maybe a periodic job within the server to invalidate the cache) |
We pass in context whenever we initialize the GitHub client, so I don't think caching the client with context is possible. Instead, we can follow the approach of not passing
func ContextClient(ctx context.Context) *http.Client {
if ctx != nil {
if hc, ok := ctx.Value(HTTPClient).(*http.Client); ok {
return hc
}
}
if appengineClientHook != nil {
return appengineClientHook(ctx)
}
return http.DefaultClient
} If we don't use Google app engine, we get the So changed implementation would look like: - tc := oauth2.NewClient(ctx, ts)
+
+ tc := &http.Client{
+ Transport: &oauth2.Transport{
+ Base: http.DefaultClient.Transport,
+ Source: oauth2.ReuseTokenSource(nil, ts),
+ },
+ } |
Other than that, I don't see any problem caching a |
After doing some research, I think we can do this. Istio also caches the HTTP client to reuse the connection in the future. https://github.com/istio/istio/blob/7bd27e8a432bd700f754e54593dc584d4973d376/pilot/cmd/pilot-agent/status/server.go#L266-L266. |
Thank you for doing all this research @Vyom-Yadav. Where/how were you thinking about caching the client, in the tokenStatusStore? One thing that sounds concerning to me is that if we were to cache the http clients globally as opposed to e.g. for a single evaluation of an entity, the number of open connections would grow in a situation like stacklok's SaaS where a single minder instance can server many users and we could exhaust the number of fds available or just grow the memory a lot. |
@jhrozek Yes, I was thinking of storing these clients in memory only. A bit different from tokenStatusStore, but yes, the same concept.
That's a good concern, but we can do many things to get around it.
All these optimization should prevent us from exhausting server resources. Sounds good? |
ebea814
to
2fe94d6
Compare
op := func() (any, error) { | ||
_, _, err := c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{ | ||
Body: &comment, | ||
}) | ||
|
||
if isRateLimitError(err) { | ||
waitWrr := c.waitForRateLimitReset(ctx, err) | ||
if waitWrr == nil { | ||
return nil, err | ||
} | ||
return nil, backoffv4.Permanent(err) | ||
} | ||
|
||
return nil, backoffv4.Permanent(err) | ||
} | ||
_, err := performWithRetry(ctx, op) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can discuss a prettier approach in a separate PR. I won't modify all endpoints in this PR. The two modifications are just examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works for me.
2fe94d6
to
4fe8c69
Compare
@jhrozek I implemented a pattern I think should work without hitting any bottleneck; please review it once. Thanks! |
Apologies for the delay. Last week, the whole Stacklok company was meeting in-person and the bandwidth for development was limited. I am looking at the code now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really nice code, great work.
I left a couple of comments, feel free to make another iteration or a follow up PR or reply if you disagree, but none of my comments are really blocking.
internal/providers/github/github.go
Outdated
path = resp.Request.URL.Path | ||
} | ||
|
||
logger.Debug(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be nicer to create a zerolog.Event
first and then add the path and method to the event record if they exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be better. Done.
op := func() (any, error) { | ||
_, _, err := c.client.Issues.CreateComment(ctx, owner, repo, number, &github.IssueComment{ | ||
Body: &comment, | ||
}) | ||
|
||
if isRateLimitError(err) { | ||
waitWrr := c.waitForRateLimitReset(ctx, err) | ||
if waitWrr == nil { | ||
return nil, err | ||
} | ||
return nil, backoffv4.Permanent(err) | ||
} | ||
|
||
return nil, backoffv4.Permanent(err) | ||
} | ||
_, err := performWithRetry(ctx, op) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works for me.
r.evictExpiredEntries() | ||
|
||
// Reset the timer for the next expiration | ||
timer.Reset(r.evictionTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if ticking once per eviction time is not too seldom. If a an entry is to be evicted just before a goroutine tick, this might mean that the entry will be in the cache for 30 more minutes. Do you think it would be useful to have the goroutine act more frequently, maybe twice per eviction time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would be better to run the expired check using half the interval. Done.
internal/providers/github/github.go
Outdated
Str("method", method). | ||
Str("path", path). | ||
Str("wait_time", waitTime.String()). | ||
Msg("rate limit exceeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it might make sense to also log the two different rate limits (abuse vs primary) using a keyword (e.g. pass another parameter like "type" and set it one of two constant values)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be better. Done.
* We employ an optimistic concurrency control model to minimize goroutine blocking and reduce rate limit errors, thus preventing integration bans * Each server replica maintains its own set of rate-limited clients, eliminating the need for synchronization across replicas. If a rate limit error occurs, the client self-blocks and is added to a cache for future requests * Requests that return rate limit errors are retried following a backoff period * The current watermill pub-sub implementation being used, i.e. SQL and go channels, are both a single channel with head-of-line blocking, which means that blocking in middleware/handler blocks all processing. So, currently, if a client is blocked, the entire processing will be blocked. This will change when we shift to other pub-sub implementations and have multiple workers Signed-off-by: Vyom-Yadav <[email protected]>
4fe8c69
to
489cadc
Compare
@jhrozek When we used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
// Wait for the client cache background routine to finish | ||
restClientCache.Wait() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jhrozek Question: According to this logic, if the go routines above finish their execution (somehow, though it shouldn't happen for a server), this would mean the context has not been cancelled, and this would lead to an infinite wait as the ticker only stops using defer and context cancellation.
On the other hand, the go routines started above should keep running as it's a server binary. In case of panic, the whole process would be stopped. Just double-checking, whether this is correct or not :)
Edit-
Hmm, this looks wrong, the server would stop on fatal errors (non-panic) and the restClientCache eviction go routine would prevent the main go routine from exiting.
@JAORMX I was addressing #2271 (comment) :) func (r *restClientCache) Close() {
r.evictionTicker.Stop()
} |
@Vyom-Yadav let's do it in a subsequent PR. Thanks for all this work! |
Issue #2261
We employ an optimistic concurrency control model to minimize goroutine blocking and reduce rate limit errors, thus preventing integration bans
Each server replica maintains its own set of rate-limited clients, eliminating the need for synchronization across replicas. If a rate limit error occurs, the client self-blocks and is added to a cache for future requests
Requests that return rate limit errors are retried following a backoff period
Note:
The current watermill pub-sub implementation being used, i.e. SQL and go channels, are both a single channel with head-of-line blocking, which means that blocking in middleware/handler blocks all processing. So, currently, if a token is blocked, the entire processing will be blocked.
This will change when we shift to other pub-sub implementations and have multiple workers.