-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
groupBy followed by flatMap is starting to drop messages under load #1762
Comments
Hi @DareUrDream, A few things here:
|
If you use |
@bsideup Let say I have twice the memory needed to hold Integer.MAX_VALUE records, then what would happen when that much is in memory and say another 100K records come in.. |
@bsideup Reponse to the observations below..
--> This was the test that helped me figure out that groupBy was becoming a bottleNeck for me.. It is not there in my production code.
--> Just a help :-)
--> This is interesting... I have a situation where I have only 2 vCPU's available, so am trying to consume on single scheduler, and my consumer is pretty fast |
This is a very important question for us as the application would keep running for days and eventually in few months it will stop working if the groupBy start dropping messages... Do we have an alternative ?? |
@smaldini Can we have ability to create unbounded group supplier without setting prefetch to Integer.MAX_VALUE? |
@smaldini public static <T> Supplier<Queue<T>> unbounded(int linkSize) {
if (linkSize == XS_BUFFER_SIZE) {
return XS_UNBOUNDED;
}
else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) {
return unbounded();
}
return () -> new SpscLinkedArrayQueue<>(linkSize);
} Integer.MAX_VALUE still will drop SMALL_BUFFER_SIZE queue supplier... |
Without a mechanism that will "cleanup" groups over time, creating |
@bsideup I guess, it would be helpful to mention in |
In our case we do not have unlimited groups, we have e.g. a group count of something in between of 10 - 10_000 this will be fixed for the lifetime of the application (so no memory leak here). But we can't use groupBy (took me hours to figured that out...) because of that issue here. |
closing in favor of #2352, which seem to occur with low cardinality (when immediately cancelling groups) |
Re-post of question in stackoveflow -- https://stackoverflow.com/questions/56726323/reactor-groupby-is-starting-to-drop-messages
Below is my data pipe definition. But this is start to fall out under load when am creating say hundred/thousands group per minute(Group stays in memory from 15 min to 8 hours).
Anyways to solve this ..
I did refer #1544, #726, #596, #931 but my question is even if I include Integer.MAX_VALUE as prefetch for groupBy and say a substantially large number or say INTEGER.MAX_VALUE for maxConcurreny on the flatMap, would it not run out of those numbers in future and start dropping message. (Pardon my ignorance on Reactor, I am a newbie trying to learn while working)
The text was updated successfully, but these errors were encountered: