-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Feed PubSub: execute subscribers' blocking operations in separate goroutines #2208
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing only the go logic for processing events in a separate go-routine. My understanding is that it works just because we don't stop receiving from the src channel and allow it to fill up, blocking the sender. That looks good to me, just make sure that the order of the events really do not matter, as mutexes are not FIFO.
It would be good to avoid holding so many (locked) goroutines in background. At the same time as you could use a channel for the locking instead, with which you could provide rate-limiting and timeouts, it could also be a premature optimization right now that we don't really need. So if we do implement timeouts for all the internal operations of those routines (which I understood is the next step here), we should be good already.
Thinking further about this, WDYT instead of providing a helper function somewhat like this? It basically works as a channel with a dynamically-sized, growing buffer. func pipeBuffered(ctx context.Context, in <-chan interface{}) <-chan interface{} {
out := make(chan interface{}, 1)
go func() {
defer close(out)
buf := []interface{}{}
pipe:
for {
if len(buf) == 0 {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok {
break pipe
}
buf = append(buf, val)
}
continue
}
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok {
break pipe
}
buf = append(buf, val)
case out <- buf[0]:
buf = buf[1:]
}
}
for val := range buf {
select {
case <-ctx.Done():
return
case out <- val:
}
}
}()
return out
} With it, you would pass the subscription channel and it would return a channel which only has a "direct" buffer of 1, but actually has a resizable buffer kept as an off-channel slice. It would keep the events ordering while still holding more control over the parallelism, and a simpler usage in the end (just the way it was before). For example, later we could define a hard limit to the slice buffer and have events be dropped if we reach it. Or maybe add a log in case the buffer gets too big. Implementation can be optimized on how it grows/shrinks the slice, for example with a circular buffer, but I guess it could be enough for now. Performance-wise, I think it would even be a little cheaper than maintaining N locked goroutines, given that each goroutine will also need to hold at least 1 event in the stack, which is everything that we hold in the slice buffer. |
This looks very nice at first, but... Golang (at least the version we use in the project) does not have generics and therefore it'll not bake in nicely into our watchers. We would need to create a similar function for each chan parameter that we use (which is probably overkill). At first, I thought we could change all the Subscribe functions to use |
Yes. I think that the order of events does not matter. Also, these locks are actually kind-of optimization to not execute the same transaction at the same time. It's enough to execute it once, so trying to send it twice at the same time may result in some gas burnt unnecessarily.
Yes, I think it's technically possible to convert it into a channel-based solution, I just find it more difficult. Check my comment into your solution with |
d38615a
to
8883222
Compare
Yeah makes sense! Can't wait for generics to reach a stable version 🤩 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yondonfu I addressed your comments, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Looks like the commit structure could use a bit of clean up before merging.
34cb477
to
9d2ab94
Compare
What does this pull request do? Explain your changes. (required)
Avoid blocking Feed mechanism by executing every blocking operation in a separate goroutine. As a result, if any operation blocks, then we have a blocked/leaked goroutine instead of stopping the whole livepeer.
See #2207 for a detailed description.
Specific updates (required)
How did you test each of these updates (required)
Checked the same scenario using the code from
master
and the code from this PR.time.Sleep(1 * time.Hour)
intoroundinitializer.go#TryInitialize()
to simulate that this operation is blockedInitialize Round
andReward
.In the case of
master
, orchestrator was blocked and stopped transcoding. Broadcaster failed transcoding with the following logs.In the case of code from this PR, everything worked, even though the roundinitializer was blocked.
Does this pull request close any open issues?
fix #2168
Checklist:
make
runs successfully./test.sh
pass