-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x Observable.combineLatestDelayError delivers Error after completion #4986
Comments
Hi.
|
Thanks for clarification. Mixed Still I find behaviour of |
Yes, I'm working on the fix for 3). |
Just was experimenting with @Test
public void testCombine2Flowable2Errors() throws Exception {
TestSubscriber<Integer> testObserver = TestSubscriber.create();
TestScheduler testScheduler = new TestScheduler();
Flowable<Integer> emptyFlowable = Flowable.timer(10, TimeUnit.MILLISECONDS, testScheduler)
.flatMap(aLong -> Flowable.error(new Exception()));
Flowable<Object> errorFlowable = Flowable.timer(100, TimeUnit.MILLISECONDS, testScheduler).map(aLong -> {
throw new Exception();
});
Flowable.combineLatestDelayError(
Arrays.asList(
emptyFlowable
.doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
.doFinally(() -> System.out.println("emptyFlowable: doFinally")),
errorFlowable
.doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
.doFinally(() -> System.out.println("errorFlowable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testObserver);
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
testObserver.awaitTerminalEvent();
} Output
Without timeouts it works as expected |
Yes, it looks like the other source is not cancelled in time. I've posted the fix PR as #4987 that resolves 3) and this latest case. |
Closing via #4987 |
Observable.combineLatestDelayError sends error event after complete event happened and treated as unhandled.
Checked on
rx.Observable
,io.reactivex.Observable
andFlowable
Output:
testCombine
testCombine2
testCombine2Flowable
If error emitter goes first or add some timer instead of empty, then everything is ok.
Also noticed difference in events order between 1.x and 2.x. Is it correct?
The text was updated successfully, but these errors were encountered: