diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 4af0c54b2c..21f6252295 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -169,12 +169,13 @@ public void call(Collection> observers) { @Override public void onNext(T v) { - /** - * Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs. - */ - lastNotification.set(new Notification(v)); - for (Observer o : subscriptionManager.rawSnapshot()) { - o.onNext(v); + // do not overwrite a terminal notification + // so new subscribers can get them + if (lastNotification.get().isOnNext()) { + lastNotification.set(new Notification(v)); + for (Observer o : subscriptionManager.rawSnapshot()) { + o.onNext(v); + } } } diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index ff7033c0c5..dfe6062a57 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -65,6 +65,7 @@ public Subscription onSubscribe(Observer actualObserver) { try { current.terminationLatch.await(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted waiting for termination.", e); } break; diff --git a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java index 19badb2d95..caca18204d 100644 --- a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java @@ -185,5 +185,55 @@ public void testCompletedAfterErrorIsNotSent() { verify(aObserver, never()).onNext("two"); verify(aObserver, never()).onCompleted(); } + @Test + public void testCompletedAfterErrorIsNotSent2() { + BehaviorSubject subject = BehaviorSubject.create("default"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onError(testException); + subject.onNext("two"); + subject.onCompleted(); + + verify(aObserver, times(1)).onNext("default"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onError(testException); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onCompleted(); + + Observer o2 = mock(Observer.class); + subject.subscribe(o2); + verify(o2, times(1)).onError(testException); + verify(o2, never()).onNext(any()); + verify(o2, never()).onCompleted(); + } + + @Test + public void testCompletedAfterErrorIsNotSent3() { + BehaviorSubject subject = BehaviorSubject.create("default"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onCompleted(); + subject.onNext("two"); + subject.onCompleted(); + verify(aObserver, times(1)).onNext("default"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onCompleted(); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, never()).onNext("two"); + + Observer o2 = mock(Observer.class); + subject.subscribe(o2); + verify(o2, times(1)).onCompleted(); + verify(o2, never()).onNext(any()); + verify(aObserver, never()).onError(any(Throwable.class)); + } }