-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FluxGroupBy silently drops onNext signals #2352
Comments
I think I've isolated this particular unexpected behavior to being the result of a race condition in I think I've figured a workaround that both mitigates my issue and verifies the race condition. If I isolate both publish and subscribe operations on FluxGroupBy with the same Worker (Thread) that's responsible for the @Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.single(Schedulers.elastic());
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.subscribeOn(scheduler, true)
.doOnNext(batch -> {
try {
Thread.sleep(1L);
} catch (Exception e) {
}
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
} I suppose my question to Maintainers is "what should be the general expected behavior in the absence of this Scheduler isolation?" At the very least, silent dropping of onNext items seems like a bug. I think it may be debatable whether or not item dropping should be possible in this situation. If so, such behavior may be a surprise to clients, especially without decent knowledge of how publish and subscribe operation threading may cause this condition |
@Sage-Pierce you seem to be blocking the non-blocking threads. Maybe you wanted |
when I remove |
For deduplicating an infinite stream, I'd recommend something like this (in case Flux.<Integer>generate(sink -> sink.next(ThreadLocalRandom.current().nextInt(0, 100)))
.log("producer")
.transformDeferred(flux -> {
Set<Integer> items = new HashSet<>();
return flux.filter(items::add);
})
.log("next")
.take(100)
.blockLast(); Note that the |
Thank you for the comments and suggestions, @bsideup!
I'll update the failure scenario code (below) to use blocking threads
Although this is indeed blocking, the Here's an example without @Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Fails
// Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.subscribeOn(scheduler, true)
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
Using caches with TTLs and eviction is an interesting path to consider. However, there are a few problems with substituting it for the deduplication functionality I'm accomplishing with Reactor. For one, there would be less memory efficiency due to keeping items in the caches longer than they actually need to be. For another, the behavior is slightly different; In my case, when there are items/entities with duplicate IDs in the same window, I want to emit the latest item once the window expires. The provided Set-based substitute would emit the first/earliest item. There might be a way to implement emission of the latest items if Caffeine or Guava provides something like |
@Sage-Pierce I still see |
Yes, the I could certainly be wrong, but I don't think the latest code should be blocking on the parallel Scheduler anymore, due to explicitly specifying non-parallel Scheduler 🤔 |
FWIW, code without @Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Fails
//Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
} |
@Sage-Pierce this code still blocks the thread (by running the heavy computations). Consider using |
@bsideup I'll go ahead and add the In any case, with the publishOn, I get more interesting behavior. The test either hangs or succeeds with following code: @Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Hangs
// Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.publishOn(scheduler)
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
} And either hangs or flaky hang-fails with following code (note difference in downstream @Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Hangs
// Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Flaky hang-fails
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.publishOn(Schedulers.newParallel("test"))
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
} Interestingly, the hanging behavior seems similar to issue discussed in #2138 |
Although I haven't fixed the issue yet, I think I got a much simpler reproducer: final int total = 100;
Long count = Flux.range(0, total)
.groupBy(i -> (i / 2) * 2, 42)
.flatMap(it -> it.take(1), 2)
.publishOn(Schedulers.parallel(), 2)
.count()
.block(Duration.ofSeconds(60));
assertThat(total - count).as("count").isZero(); fails with: Expected :0
Actual :44 |
…opping items [ExpediaGroup#94]Add test for high-load groupBy dropping (w/ early cancellation) reactor/reactor-core#2352 [ExpediaGroup#94]Update DeduplicatingTransformerTest to actually expose bug Fix comment
[#94]Add test for high-load groupBy dropping (w/ early cancellation) reactor/reactor-core#2352 [#94]Update DeduplicatingTransformerTest to actually expose bug Fix comment
…opping items [ExpediaGroup#94]Add test for high-load groupBy dropping (w/ early cancellation) reactor/reactor-core#2352 [ExpediaGroup#94]Update DeduplicatingTransformerTest to actually expose bug Fix comment (cherry picked from commit 0e8b963)
[#94]Add test for high-load groupBy dropping (w/ early cancellation) reactor/reactor-core#2352 [#94]Update DeduplicatingTransformerTest to actually expose bug Fix comment (cherry picked from commit 0e8b963)
@Sage-Pierce since I see that you've reworked your code, and since we've made other changes to groupBy in the past year, can this issue be closed? |
@simonbasle I'm not sure I can say this isn't still an issue. Even with Reactor @Test
public void upstreamEmissionsShouldMatchDownstream() throws Exception {
Hooks.onNextDroppedFail();
int numGroups = 8;
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CompletableFuture<Void> future = new CompletableFuture<>();
Flux.generate(sink -> sink.next(UUID.randomUUID()))
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L)), numGroups)
.subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));
future.get();
System.out.println("Emitted: " + upstream.get());
assertEquals(upstream.get(), downstream.get());
} I believe the problem is the time-based To your point, though, I have found a workaround for this by ensuring that upstream publishing and group cancellation happens on the same thread by using a single-Worker Scheduler. The following modified example causes the test to pass: @Test
public void upstreamEmissionsShouldMatchDownstream() throws Exception {
Hooks.onNextDroppedFail();
int numGroups = 8;
Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CompletableFuture<Void> future = new CompletableFuture<>();
Flux.generate(sink -> sink.next(UUID.randomUUID()))
.take(Duration.ofSeconds(30L))
.publishOn(scheduler)
.doOnNext(next -> upstream.incrementAndGet())
.groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler), numGroups)
.subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));
future.get();
System.out.println("Emitted: " + upstream.get());
assertEquals(upstream.get(), downstream.get());
} I still suspect users would be scratching their heads over the case where items are dropped. At the very least, I would expect the dropped |
This issue has some apparent history and various references. Is it worth looking into in 2024 @Sage-Pierce ? There is a current attempt (#3872) to document some halting issues with |
@Sage-Pierce please reopen if required. I'm closing this as it's quite dated. |
Heya @chemicL, sorry I meant to get back to you on this last week. IMO, this is still a demonstrable bug in 2024. It would be great if a solution could be found for it, but it remains beyond me to figure out what that solution is. Given that there is a workaround, and given lack of progress on this, it's clearly not an urgent bug. However, if anyone else comes along and wants to take a crack at it, I wonder if the issue should remain open for them |
Thanks. Reopening. |
I have a use case where I'm using
Flux::groupBy
with a relatively high cardinality of grouping (1024-4096 concurrent groups at any given time). FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.Each "group" is limited in time (
.take(Duration)
) and size (.take(Long)
) and collected as a List. Under high load, it appears some items are being dropped.Thus far, I think I can point to where the relevant pieces of code are:
FluxGroupBy::onNext
may be retrieving a UnicastGroupedFlux racing to termination:https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java#L194-L212
UnicastGroupedFlux::drainRegular
may be short circuit returning after emitting non-zero number of elementshttps://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java#L545-L554
UnicastGroupedFlux::checkTerminated
silently clears its queue when checking termination:https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java#L642-L647
Expected Behavior
Items are not dropped by
groupBy
operatorActual Behavior
Items emitted from upstream of
groupBy
operator are not emitted downstream, and are silently being dropped.Steps to Reproduce
I wrote the following test to show a simplified version of what's going on with my use case:
Interestingly, the test sometimes passes without the
doOnNext
withThread.sleep(1L)
. I had to add that to consistently get it to fail (and mimic small real-world processing overhead of each downstream item)Possible Solution
I'm not sure. Still trying to figure out exactly what the problem is in
groupBy
Your Environment
uname -a
): Mac OS Catalina 10.15.6 (Darwin Kernel Version 19.6.0 x86_64
)The text was updated successfully, but these errors were encountered: