Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x Observable.combineLatestDelayError delivers Error after completion #4986

Closed
StanislavChumarin opened this issue Jan 12, 2017 · 6 comments

Comments

@StanislavChumarin
Copy link

StanislavChumarin commented Jan 12, 2017

Observable.combineLatestDelayError sends error event after complete event happened and treated as unhandled.
Checked on rx.Observable, io.reactivex.Observable and Flowable

    @Test
    public void testCombine() {
        rx.observers.TestSubscriber<Integer> testSubscriber = rx.observers.TestSubscriber.create();

        rx.Observable<Long> emptyObservable = rx.Observable.empty();
        rx.Observable<Object> errorObservable = rx.Observable.error(new Exception());

        rx.Observable.combineLatestDelayError(
                Arrays.asList(
                        emptyObservable
                                .doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
                                .doOnTerminate(() -> System.out.println("emptyObservable: doFinally")),
                        errorObservable
                                .doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
                                .doOnTerminate(() -> System.out.println("errorObservable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doOnTerminate(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testSubscriber);

        testSubscriber.awaitTerminalEvent();
    }

    @Test
    public void testCombine2() {
        TestObserver<Integer> testObserver = TestObserver.create();

        Observable<Long> emptyObservable = Observable.empty();
        Observable<Object> errorObservable = Observable.error(new Exception());

        Observable.combineLatestDelayError(
                Arrays.asList(
                        emptyObservable
                                .doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyObservable: doFinally")),
                        errorObservable
                                .doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorObservable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testObserver.awaitTerminalEvent();
    }

    @Test
    public void testCombine2Flowable() {
        TestSubscriber<Integer> testObserver = TestSubscriber.create();

        Flowable<Integer> emptyFlowable = Flowable.empty();
        Flowable<Object> errorFlowable = Flowable.error(new Exception());

        Flowable.combineLatestDelayError(
                Arrays.asList(
                        emptyFlowable
                                .doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyFlowable: doFinally")),
                        errorFlowable
                                .doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorFlowable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testObserver.awaitTerminalEvent();
    }

Output:
testCombine

emptyObservable: [rx.Notification@2b4a2ec7 OnCompleted]
emptyObservable: doFinally
combineLatestDelayError: [rx.Notification@2b4a2ec7 OnCompleted]
combineLatestDelayError: doFinally

testCombine2

emptyObservable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyObservable: doFinally
errorObservable: OnErrorNotification[java.lang.Exception]
errorObservable: doFinally
java.lang.Exception
	at com.myproject.Test.testCombine2(Test.java:298)
	// not really important Stacktrace
Exception in thread "main" java.lang.Exception
	at com.myproject.Test.testCombine2(Test.java:298)
	// repeat of not important Stacktrace

testCombine2Flowable

emptyFlowable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyFlowable: doFinally

If error emitter goes first or add some timer instead of empty, then everything is ok.

Also noticed difference in events order between 1.x and 2.x. Is it correct?

@akarnokd
Copy link
Member

akarnokd commented Jan 12, 2017

Hi.

  1. doOnTerminate executes before the terminal event is emitted, also != ˛doFinally which executes after.
  2. Flowable.combineLatest doesn't subscribe to the second source if the first terminates without any onNext item
  3. Observable.combineLatest is supposed to work like 2) but apparently doesn't, hence the extra error.

@StanislavChumarin
Copy link
Author

Thanks for clarification. Mixed doOnTerminate and doAfterTerminate operators.

Still I find behaviour of Observable.combineLatest wrong. That was main reason of this post.
Will it be fixed?

@akarnokd
Copy link
Member

Yes, I'm working on the fix for 3).

@StanislavChumarin
Copy link
Author

StanislavChumarin commented Jan 12, 2017

Just was experimenting with combineLatestDelayError
Looks like this case also throws extra exception.

 @Test
    public void testCombine2Flowable2Errors() throws Exception {
        TestSubscriber<Integer> testObserver = TestSubscriber.create();

        TestScheduler testScheduler = new TestScheduler();

        Flowable<Integer> emptyFlowable = Flowable.timer(10, TimeUnit.MILLISECONDS, testScheduler)
                .flatMap(aLong -> Flowable.error(new Exception()));
        Flowable<Object> errorFlowable = Flowable.timer(100, TimeUnit.MILLISECONDS, testScheduler).map(aLong -> {
            throw new Exception();
        });

        Flowable.combineLatestDelayError(
                Arrays.asList(
                        emptyFlowable
                                .doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyFlowable: doFinally")),
                        errorFlowable
                                .doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorFlowable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

        testObserver.awaitTerminalEvent();
    }

Output

emptyFlowable: OnErrorNotification[java.lang.Exception]
combineLatestDelayError: OnErrorNotification[java.lang.Exception]
combineLatestDelayError: doFinally
emptyFlowable: doFinally
errorFlowable: OnErrorNotification[java.lang.Exception]
errorFlowable: doFinally
java.lang.Exception
	at com.myproject.Test.testCombine2Flowable2Errors(Test.java:298)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:62)
	at io.reactivex.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.schedulers.TestScheduler.triggerActions(TestScheduler.java:115)
	// not really important Stacktrace
Exception in thread "main" java.lang.Exception
	at com.myproject.Test.testCombine2Flowable2Errors(Test.java:298)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:62)
	at io.reactivex.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.schedulers.TestScheduler.triggerActions(TestScheduler.java:115)
	// repeat of not important Stacktrace

Without timeouts it works as expected

@akarnokd
Copy link
Member

Yes, it looks like the other source is not cancelled in time.

I've posted the fix PR as #4987 that resolves 3) and this latest case.

@akarnokd
Copy link
Member

Closing via #4987

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants