-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add WaitGroup
synchronization primitive
#14167
Add WaitGroup
synchronization primitive
#14167
Conversation
This is more efficient than creating a Channel(Nil) and looping to receive N messages: we don't need a queue, only a counter, and we can avoid spurious wakeups of the main fiber and resume it only once.
Note to self: this (or rather its MT equivalent) is also known as a latch in C++ and Java |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need a bunch more tests for this. E.g. for multiple #wait
calls, fibers adding more fibers, #add
with negative delta or #done
called before #wait
. The latter two could both result in @counter < 0
which is a relevant invariant to verify.
The Go implementation has some test cases that we could take inspiration from: https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/sync/waitgroup_test.go
As food for thought: https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/sync/waitgroup.go |
Go merges the counter (i32) and the number of waiters (u32) into a single atomic (u64) which is a neat idea, then uses a semaphore to suspend the goroutines. What I'm curious about is:
|
I'm afraid following on Go won't be possible because their implementation takes advantage of the atomics returning the new value (e.g. add to counter => returns new counter + waiters), but Crystal relies on LLVM atomics that always return the old value, which... is often pointless 😞 For example, to support
|
# the fiber will be resumed. | ||
# | ||
# Can be called from different fibers. | ||
def wait : Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this donation very much! It will be very useful in many cases.
One proposal that probably can be done later on as a separate improvement - is to make the wait
method compatible with select
to support the following snippet:
select
when wg.wait
puts "All fibers done"
when timeout(X.seconds)
puts "Some fiber stuck"
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe just have wg.wait(timeout: Time::Span | Nil)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bararchy We're missing a generic mechanism for timeouts... but we could abstract how it's implemented for select
so that could be doable.
That doesn't mean we can't also integrate with select
: we could wait on channel(s) + waitgroup(s) + timeout. Now, I'm not sure how to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ysbaddaden I think that @alexkutsan's idea is better, because then we don't need to handle a Timeout Exception in case that the Timeout happen, and instead handle it in select context which seems cleaner, like how channel
works when calling "receive" etc..
So I think my idea is less clean tbh 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a commit to support WaitGroup in select
expressions.
The integration wasn't complex after I understood how SelectAction and SelectContext are working, but the current implementation is very isolated to Channel (on purpose). Maybe the integration is not a good idea, but if proves to be a good idea, we might want to extract the select
logic from Channel
to the Crystal
namespace.
I'll open a pull request after this one is merged, so we can have a proper discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ysbaddaden now that it's in and merged, are you planning to make the followup PR? 👁️
Co-authored-by: Johannes Müller <[email protected]>
Co-authored-by: Jason Frey <[email protected]>
|
Disables the stress test when interpreted as it takes forever to complete.
This failure consistently appears in #14122 as well. |
And in #14257. |
I can't reproduce the "can't resuming running fiber" anymore when I disable the thread specs. I think we likely need actual support from the interpreter to start threads in the interpreted code.
|
Co-authored-by: Sijawusz Pur Rahnama <[email protected]>
At the cost of using a The performance implications depend on whether in practice waitgroups will be protecting "large" operations (i.e. the time each thread spends maintaining the waitgroup is not dominant, and each cas op is very likely to succeed) or "small" operations (i.e. the thread spends most of it's time maintaining the waitgroup, and each cas op is likely to fail). My intuition is that it's the former and the performance implications of using a cas loop would be tiny. Apologies for bringing this up at a "late" stage in the PR, this didn't occur to me until now. |
@RX14 Interesting. I'm not concerned about performance (it's still lock free so it's fine to me), but:
Or am I missing something? |
Thanks @RX14. I'm noticing more edge cases. For example we could reach a negative number (raises) then a concurrent fiber would increment back to a positive number and fail to raise because the new counter is positive, also impacting the resumed waiting fibers that may continue despite the waitgroup being left in an invalid state. I'll likely to add a CAS loop, just not saturating at zero, but keep the negative number to detect the invalid state. |
It's now impossible for `#add` to increment a negative counter back into a positive one. `#wait` now checks for negative counter in addition to zero counter right after grabbing the lock.
I eased out corner edges:
I'm a bit torn about the last one: the situation can happen when the counter reached zero, enqueued waiters, and continued to increment, which is invalid, yet there is a race condition when reusing a WaitGroup with at least 2 waiters: fiber A reuses the WaitGroup (i.e. increments) before fiber B resumes (positive counter -> raise). Oh, the race condition would also trigger with a negative counter (lower probability but could happen), so the problem is reusing the object before all waiters are properly resumed. Ah, the joys of writing a synchronization primitive |
My highest concern is deadlocks: any condition where the counter remains on zero but fails to resume a waiter. There are race conditions when the waitgroup is used weirdly, but they do not cause a wrong count or a deadlock so can fail in a better way (raising). |
AFAIK it should now be impossible to fail to wake the waiting fibers or leave the waitgroup in a confusing state: the counter saturates to a negative number and can't return to a positive number anymore; waiting fibers are always resumed (once) when the counter reaches zero or below. I can't think of any scenario where we'd end up with a deadlock. I can still think of race conditions, though:
Depending on when the fibers are resumed, some may return successfully (zero counter) while some may raise (negative counter), yet at least one fiber will raise (the one decrementing the counter below zero), so the error shouldn't go unnoticed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great to me, and I think the remaining race condition is defined well enough to not have a negative impact, especially as it doesn't appear in good usages
I cannot wait to use this over some Channels that I have. |
Same! very excited for this one 🎉 |
WaitGroup
synchronization primitive
This is more efficient than creating a
Channel(Nil)
and looping to receive N messages: we don't need a queue, only a counter, and we can avoid spurious wake ups of the main fiber and resume it only once.See the documentation for examples and more details.