-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix groupBy initial request off by one #2450
Conversation
f211225
to
c40e359
Compare
Codecov Report
@@ Coverage Diff @@
## master #2450 +/- ##
============================================
- Coverage 83.75% 83.67% -0.08%
+ Complexity 4716 4698 -18
============================================
Files 391 389 -2
Lines 32292 32177 -115
Branches 6207 6190 -17
============================================
- Hits 27046 26924 -122
- Misses 3532 3540 +8
+ Partials 1714 1713 -1 Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add tests, at least one that validates that it fixes #2352 ?
Actually, it fixes buffer overflow, but it does not fix possible drops of elements in case cancel onNext races. I'm not sure it is possible to guarantee that there will not bee any drops on such a racing. But yeah, I added some tests, just forgot to push them |
Are there plans to merge it any time soon? |
Thanks for the ping. I need to review this again, with the added tests, it fell through the cracks. In the meantime, if you think you were affected (from the description in this PR) it would be highly valuable for us to get your early feedback (by checking out the branch and building it locally to try to see if it improves things for you). Same for @Sage-Pierce, also @OlegDokuka already stated it didn't really fix the exact issue described in #2352 |
Thanks @simonbasle! The issue we are having similar to #2138. We are using reactive-kafka consumer and processing kafka events stream using combination of groupBy/take and other operators. At some arbitrary moments the stream hangs up on production and based on logs it seems to be related to groupBy. We are trying to reproduce the issue in our end-to-end tests right now, once we can reproduce it in consistent manner I'll try the fix and provide feedback. |
Hi! I've pulled the changes and applied to v3.4.1 of reactor-core. Unfortunately it doesn't fix issue described in #2138. I've also tried to run twoGroupsLongAsyncMergeHidden2 test @OlegDokuka added, and it never fails (on my env it has passed with the fix and without). |
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
65d2ebd
to
5ba8625
Compare
@pavelkuchin thanks for trying it out nonetheless. |
@simonbasle this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
This fixes the following misalignment:
s.request(e)
whene
groups are produced to the downstream. Each group at the moment has at least 1 element enqueued into it.main.s.request(e)
where e covers the first elementisFirstRequest
check which allows removing that redundant demand for the first element on the firstrequestN
Signed-off-by: Oleh Dokuka [email protected]