Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupBy followed by flatMap is starting to drop messages under load #1762

Closed
DareUrDream opened this issue Jun 24, 2019 · 11 comments
Closed
Labels
status/need-investigation This needs more in-depth investigation

Comments

@DareUrDream
Copy link

Re-post of question in stackoveflow -- https://stackoverflow.com/questions/56726323/reactor-groupby-is-starting-to-drop-messages

Below is my data pipe definition. But this is start to fall out under load when am creating say hundred/thousands group per minute(Group stays in memory from 15 min to 8 hours).
Anyways to solve this ..

I did refer #1544, #726, #596, #931 but my question is even if I include Integer.MAX_VALUE as prefetch for groupBy and say a substantially large number or say INTEGER.MAX_VALUE for maxConcurreny on the flatMap, would it not run out of those numbers in future and start dropping message. (Pardon my ignorance on Reactor, I am a newbie trying to learn while working)

ds.getPublisher()
        .onBackpressureBuffer(15000)
        .onBackpressureDrop(record -> LOGGER.error("Backpressure applied-1. Dropped records: {}", record))
        .groupBy(record -> record.getGroupId())
        .flatMap(group -> group
                .timeout(Duration.ofSeconds(60))
                .bufferUntil(record -> isGroupComplete(record))
                .bufferTimeout(100, Duration.ofSeconds(5))
                .map(listOflistOfRecords -> listOflistOfRecords.stream().flatMap(List::stream).collect(Collectors.toList()))
                .onErrorContinue((th, records) -> {
                    LOGGER.error("Timedout records: {}", records);
                                            // TAKE ACTION ON THE RECORDS
                }))
        .filter(records -> {
            return (publishController.shouldIPublish()) ? true 
                    : records.get(0).getCreatedTimestamp() <= (publishController.stopRequestTimestamp() - 5);
        })
        .doOnDiscard(List.class, records -> {
            if(! records.isEmpty()) {
                LOGGER.error("Discarded: {}", records);
                discardedRecords.put(records, new Object());
            } else {
                LOGGER.error("Empty record received. This should never happen.");
            }
        })
        .map(record -> Collections.unmodifiableList(Enricher.enrich(record)))
        .map(dbRecords -> RecordTransformer.transform(dbRecords))
        .retryBackoff(MAX_RETRY, Duration.ofSeconds(FIRST_BACKOFF_IN_SECONDS), Duration.ofSeconds(MAX_BACKOFF_IN_SECONDS))
        .publishOn(Schedulers.single());
@DareUrDream DareUrDream changed the title groupBy is starting to drop messages under load groupBy followed by flatMap is starting to drop messages under load Jun 24, 2019
@bsideup
Copy link
Contributor

bsideup commented Jun 24, 2019

Hi @DareUrDream,

A few things here:

  1. onBackpressureDrop after onBackpressureBuffer - you already applied backpressure strategy, why twice?
  2. Anything interesting in the logs?
  3. Why do you publish on a single scheduler at the end?

@bsideup
Copy link
Contributor

bsideup commented Jun 24, 2019

even if I include Integer.MAX_VALUE as prefetch for groupBy and say a substantially large number or say INTEGER.MAX_VALUE for maxConcurreny on the flatMap, would it not run out of those numbers in future and start dropping message

If you use Integer.MAX_VALUE as value, it will create an unbounded queue and queue as long as you have memory in your app

@DareUrDream
Copy link
Author

DareUrDream commented Jun 24, 2019

@bsideup Let say I have twice the memory needed to hold Integer.MAX_VALUE records, then what would happen when that much is in memory and say another 100K records come in..

@DareUrDream
Copy link
Author

DareUrDream commented Jun 24, 2019

@bsideup Reponse to the observations below..

onBackpressureDrop after onBackpressureBuffer - you already applied backpressure strategy, why twice?

--> This was the test that helped me figure out that groupBy was becoming a bottleNeck for me.. It is not there in my production code.

Anything interesting in the logs?

--> Just a help :-)

Why do you publish on a single scheduler at the end?

--> This is interesting... I have a situation where I have only 2 vCPU's available, so am trying to consume on single scheduler, and my consumer is pretty fast

@smaldini smaldini added the status/need-investigation This needs more in-depth investigation label Jun 24, 2019
@DareUrDream
Copy link
Author

@bsideup Let say I have twice the memory needed to hold Integer.MAX_VALUE records, then what would happen when that much is in memory and say another 100K records come in..

This is a very important question for us as the application would keep running for days and eventually in few months it will stop working if the groupBy start dropping messages... Do we have an alternative ??

@Kindrat
Copy link

Kindrat commented Oct 24, 2019

@smaldini
There is a default queue size reused from prefetch arg (Queues.SMALL_BUFFER_SIZE == 256). So it's not possible to create more than 256 groups for default groupBy Flux factory method.

Can we have ability to create unbounded group supplier without setting prefetch to Integer.MAX_VALUE?

@Kindrat
Copy link

Kindrat commented Oct 24, 2019

@smaldini
Also in Queues

public static <T> Supplier<Queue<T>> unbounded(int linkSize) {
		if (linkSize == XS_BUFFER_SIZE) {
			return XS_UNBOUNDED;
		}
		else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) {
			return unbounded();
		}
		return  () -> new SpscLinkedArrayQueue<>(linkSize);
	}

Integer.MAX_VALUE still will drop SMALL_BUFFER_SIZE queue supplier...

@bsideup
Copy link
Contributor

bsideup commented Oct 24, 2019

@Kindrat

Without a mechanism that will "cleanup" groups over time, creating Integer.MAX_VALUE groups will lead to a memory leak and may cause OOMs

@Kindrat
Copy link

Kindrat commented Oct 25, 2019

@bsideup
Yeah, I'm cleaning groups manually with external signal from another publisher and takeUntilOther. Don't think there could be any default generic mechanism for unbounded dynamic groups.

I guess, it would be helpful to mention in flatMap doc that maxConcurrency could be a problem for continuous groups.

@swimmesberger
Copy link

In our case we do not have unlimited groups, we have e.g. a group count of something in between of 10 - 10_000 this will be fixed for the lifetime of the application (so no memory leak here). But we can't use groupBy (took me hours to figured that out...) because of that issue here.

@simonbasle
Copy link
Contributor

closing in favor of #2352, which seem to occur with low cardinality (when immediately cancelling groups)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation
Projects
None yet
Development

No branches or pull requests

6 participants