From 955bc509d6ced282c249d5714673ec35f0061380 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Sun, 2 Mar 2014 12:40:50 +0100 Subject: [PATCH 1/6] First implementation of OperatorWeakBinding --- .../observables/AndroidObservable.java | 23 ++++++ .../OperatorObserveFromAndroidComponent.java | 1 + .../rx/operators/OperatorWeakBinding.java | 75 +++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java index d55617ddea..723ebf40b0 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java @@ -15,8 +15,12 @@ */ package rx.android.observables; +import static rx.android.schedulers.AndroidSchedulers.mainThread; + import rx.Observable; import rx.operators.OperatorObserveFromAndroidComponent; +import rx.operators.OperatorWeakBinding; + import android.app.Activity; import android.app.Fragment; import android.os.Build; @@ -58,6 +62,7 @@ private AndroidObservable() {} * @param * @return a new observable sequence that will emit notifications on the main UI thread */ + @Deprecated public static Observable fromActivity(Activity activity, Observable sourceObservable) { return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity); } @@ -86,6 +91,7 @@ public static Observable fromActivity(Activity activity, Observable so * @param * @return a new observable sequence that will emit notifications on the main UI thread */ + @Deprecated public static Observable fromFragment(Object fragment, Observable sourceObservable) { if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) { return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (android.support.v4.app.Fragment) fragment); @@ -95,4 +101,21 @@ public static Observable fromFragment(Object fragment, Observable sour throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment"); } } + + public static Observable bindActivity(Activity activity, Observable cachedSequence) { + return cachedSequence.observeOn(mainThread()).lift(new OperatorWeakBinding(activity)); + } + + public static Observable bindFragment(Object fragment, Observable cachedSequence) { + Observable source = cachedSequence.observeOn(mainThread()); + if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) { + android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment; + return source.lift(new OperatorWeakBinding(f)); + } else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) { + Fragment f = (Fragment) fragment; + return source.lift(new OperatorWeakBinding(f)); + } else { + throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment"); + } + } } diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java index 3e656ccfbb..754ebb2e4e 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java @@ -25,6 +25,7 @@ import android.os.Looper; import android.util.Log; +@Deprecated public class OperatorObserveFromAndroidComponent { public static Observable observeFromAndroidComponent(Observable source, android.app.Fragment fragment) { diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java new file mode 100644 index 0000000000..3392cc887a --- /dev/null +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java @@ -0,0 +1,75 @@ +package rx.operators; + +import rx.Observable; +import rx.Subscriber; + +import android.util.Log; + +import java.lang.ref.WeakReference; + +public final class OperatorWeakBinding implements Observable.Operator { + + private static final String LOG_TAG = "WeakBinding"; + + private final WeakReference boundRef; + + public OperatorWeakBinding(R bound) { + boundRef = new WeakReference(bound); + } + + @Override + public Subscriber call(final Subscriber child) { + return new WeakSubscriber(child, boundRef); + } + + private static final class WeakSubscriber extends Subscriber { + + private final WeakReference> subscriberRef; + private final WeakReference boundRef; + + private WeakSubscriber(Subscriber op, WeakReference boundRef) { + subscriberRef = new WeakReference>(op); + this.boundRef = boundRef; + } + + @Override + public void onCompleted() { + Subscriber sub = subscriberRef.get(); + if (sub != null && boundRef.get() != null) { + sub.onCompleted(); + } else { + handleLostBinding(sub, "onCompleted"); + } + } + + @Override + public void onError(Throwable e) { + Subscriber sub = subscriberRef.get(); + if (sub != null && boundRef.get() != null) { + sub.onError(e); + } else { + handleLostBinding(sub, "onError"); + } + } + + @Override + public void onNext(T t) { + Subscriber sub = subscriberRef.get(); + if (sub != null && boundRef.get() != null) { + sub.onNext(t); + } else { + handleLostBinding(sub, "onNext"); + } + } + + private void handleLostBinding(Subscriber sub, String context) { + if (sub == null) { + Log.d(LOG_TAG, "subscriber gone; skipping " + context); + } else { + Log.d(LOG_TAG, "bound component gone; skipping " + context); + } + unsubscribe(); + } + + } +} From d7037b5c5270c0f6af0381af03d60fc4f916141d Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Wed, 12 Mar 2014 18:20:46 +0100 Subject: [PATCH 2/6] OperatorWeakBinding supports predicates now --- .../observables/AndroidObservable.java | 56 ++++++++++-- .../rx/operators/OperatorWeakBinding.java | 54 +++++++++--- .../rx/operators/OperatorWeakBindingTest.java | 86 +++++++++++++++++++ 3 files changed, 178 insertions(+), 18 deletions(-) create mode 100644 rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java index 8da366265d..28d8d704aa 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java @@ -18,6 +18,7 @@ import static rx.android.schedulers.AndroidSchedulers.mainThread; import rx.Observable; +import rx.functions.Func1; import rx.operators.OperatorObserveFromAndroidComponent; import rx.operators.OperatorWeakBinding; @@ -40,7 +41,30 @@ public final class AndroidObservable { USES_SUPPORT_FRAGMENTS = supportFragmentsAvailable; } - private AndroidObservable() {} + private static final Func1 ACTIVITY_VALIDATOR = new Func1() { + @Override + public Boolean call(Activity activity) { + return !activity.isFinishing(); + } + }; + + private static final Func1 FRAGMENT_VALIDATOR = new Func1() { + @Override + public Boolean call(Fragment fragment) { + return fragment.isAdded(); + } + }; + + private static final Func1 FRAGMENTV4_VALIDATOR = + new Func1() { + @Override + public Boolean call(android.support.v4.app.Fragment fragment) { + return fragment.isAdded(); + } + }; + + private AndroidObservable() { + } /** * Transforms a source observable to be attached to the given Activity, in such a way that notifications will always @@ -61,6 +85,7 @@ private AndroidObservable() {} * @param sourceObservable the observable sequence to observe from the given Activity * @param * @return a new observable sequence that will emit notifications on the main UI thread + * @deprecated Use {@link #bindActivity(android.app.Activity, rx.Observable)} instead */ @Deprecated public static Observable fromActivity(Activity activity, Observable sourceObservable) { @@ -91,6 +116,7 @@ public static Observable fromActivity(Activity activity, Observable so * @param sourceObservable the observable sequence to observe from the given fragment * @param * @return a new observable sequence that will emit notifications on the main UI thread + * @deprecated Use {@link #bindFragment(Object, rx.Observable)} instead */ @Deprecated public static Observable fromFragment(Object fragment, Observable sourceObservable) { @@ -104,18 +130,38 @@ public static Observable fromFragment(Object fragment, Observable sour } } - public static Observable bindActivity(Activity activity, Observable cachedSequence) { - return cachedSequence.observeOn(mainThread()).lift(new OperatorWeakBinding(activity)); + /** + * Binds the given source sequence to the life-cycle of an activity. + *

+ * This helper will schedule the given sequence to be observed on the main UI thread and ensure + * that no notifications will be forwarded to the activity in case it gets destroyed by the Android runtime + * or garbage collected by the VM. + * + * @param activity the activity to bind the source sequence to + * @param source the source sequence + */ + public static Observable bindActivity(Activity activity, Observable source) { + return source.observeOn(mainThread()).lift(new OperatorWeakBinding(activity, ACTIVITY_VALIDATOR)); } + /** + * Binds the given source sequence to the life-cycle of a fragment (native or support-v4). + *

+ * This helper will schedule the given sequence to be observed on the main UI thread and ensure + * that no notifications will be forwarded to the fragment in case it gets detached from its + * activity or garbage collected by the VM. + * + * @param fragment the fragment to bind the source sequence to + * @param source the source sequence + */ public static Observable bindFragment(Object fragment, Observable cachedSequence) { Observable source = cachedSequence.observeOn(mainThread()); if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) { android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment; - return source.lift(new OperatorWeakBinding(f)); + return source.lift(new OperatorWeakBinding(f, FRAGMENTV4_VALIDATOR)); } else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) { Fragment f = (Fragment) fragment; - return source.lift(new OperatorWeakBinding(f)); + return source.lift(new OperatorWeakBinding(f, FRAGMENT_VALIDATOR)); } else { throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment"); } diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java index 3392cc887a..43a501b489 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java @@ -2,40 +2,58 @@ import rx.Observable; import rx.Subscriber; +import rx.functions.Func1; +import rx.functions.Functions; import android.util.Log; import java.lang.ref.WeakReference; +/** + * Ties a source sequence to the life-cycle of the given target object, and/or the subscriber + * using weak references. When either object is gone, this operator automatically unsubscribes + * from the source sequence. + *

+ * You can also pass in an optional predicate function, which whenever it evaluates to false + * on the target object, will also result in the operator unsubscribing from the sequence. + * + * @param the type of the objects emitted to a subscriber + * @param the type of the target object to bind to + */ public final class OperatorWeakBinding implements Observable.Operator { private static final String LOG_TAG = "WeakBinding"; - private final WeakReference boundRef; + final WeakReference boundRef; + private final Func1 predicate; + + public OperatorWeakBinding(R bound, Func1 predicate) { + boundRef = new WeakReference(bound); + this.predicate = predicate; + } public OperatorWeakBinding(R bound) { boundRef = new WeakReference(bound); + this.predicate = Functions.alwaysTrue(); } @Override public Subscriber call(final Subscriber child) { - return new WeakSubscriber(child, boundRef); + return new WeakSubscriber(child); } - private static final class WeakSubscriber extends Subscriber { + final class WeakSubscriber extends Subscriber { - private final WeakReference> subscriberRef; - private final WeakReference boundRef; + final WeakReference> subscriberRef; - private WeakSubscriber(Subscriber op, WeakReference boundRef) { - subscriberRef = new WeakReference>(op); - this.boundRef = boundRef; + private WeakSubscriber(Subscriber source) { + subscriberRef = new WeakReference>(source); } @Override public void onCompleted() { Subscriber sub = subscriberRef.get(); - if (sub != null && boundRef.get() != null) { + if (shouldForwardNotification(sub)) { sub.onCompleted(); } else { handleLostBinding(sub, "onCompleted"); @@ -45,7 +63,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { Subscriber sub = subscriberRef.get(); - if (sub != null && boundRef.get() != null) { + if (shouldForwardNotification(sub)) { sub.onError(e); } else { handleLostBinding(sub, "onError"); @@ -55,21 +73,31 @@ public void onError(Throwable e) { @Override public void onNext(T t) { Subscriber sub = subscriberRef.get(); - if (sub != null && boundRef.get() != null) { + if (shouldForwardNotification(sub)) { sub.onNext(t); } else { handleLostBinding(sub, "onNext"); } } + private boolean shouldForwardNotification(Subscriber sub) { + final R target = boundRef.get(); + return sub != null && target != null && predicate.call(target); + } + private void handleLostBinding(Subscriber sub, String context) { if (sub == null) { Log.d(LOG_TAG, "subscriber gone; skipping " + context); } else { - Log.d(LOG_TAG, "bound component gone; skipping " + context); + final R r = boundRef.get(); + if (r != null) { // the predicate failed to validate + Log.d(LOG_TAG, "bound component has become invalid; skipping " + context); + } else { + Log.d(LOG_TAG, "bound component gone; skipping " + context); + } } + Log.d(LOG_TAG, "unsubscribing..."); unsubscribe(); } - } } diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java new file mode 100644 index 0000000000..e5445ce228 --- /dev/null +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java @@ -0,0 +1,86 @@ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.robolectric.RobolectricTestRunner; +import rx.Subscriber; +import rx.functions.Functions; + +@RunWith(RobolectricTestRunner.class) +public class OperatorWeakBindingTest { + + @Mock + private Subscriber subscriber; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Test + public void shouldForwardAllNotificationsWhenSubscriberAndTargetAlive() { + OperatorWeakBinding op = new OperatorWeakBinding(new Object()); + OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); + weakSub.onNext("one"); + weakSub.onNext("two"); + weakSub.onCompleted(); + weakSub.onError(new Exception()); + + verify(subscriber).onNext("one"); + verify(subscriber).onNext("two"); + verify(subscriber).onCompleted(); + verify(subscriber).onError(any(Exception.class)); + } + + @Test + public void shouldUnsubscribeFromSourceSequenceWhenSubscriberReleased() { + OperatorWeakBinding op = new OperatorWeakBinding(new Object()); + + OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); + weakSub.onNext("one"); + weakSub.subscriberRef.clear(); + weakSub.onNext("two"); + weakSub.onCompleted(); + weakSub.onError(new Exception()); + + verify(subscriber).onNext("one"); + verifyNoMoreInteractions(subscriber); + } + + @Test + public void shouldUnsubscribeFromSourceSequenceWhenTargetObjectReleased() { + OperatorWeakBinding op = new OperatorWeakBinding(new Object()); + + OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); + weakSub.onNext("one"); + op.boundRef.clear(); + weakSub.onNext("two"); + weakSub.onCompleted(); + weakSub.onError(new Exception()); + + verify(subscriber).onNext("one"); + verifyNoMoreInteractions(subscriber); + } + + @Test + public void shouldUnsubscribeFromSourceSequenceWhenPredicateFailsToPass() { + OperatorWeakBinding op = new OperatorWeakBinding( + new Object(), Functions.alwaysFalse()); + + OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); + weakSub.onNext("one"); + weakSub.onNext("two"); + weakSub.onCompleted(); + weakSub.onError(new Exception()); + + verifyZeroInteractions(subscriber); + } +} From 135b6c13e03dbb519f8163f5ee5b2c8d7c87684c Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Wed, 12 Mar 2014 18:29:41 +0100 Subject: [PATCH 3/6] Some cleaning up --- .../src/main/java/rx/operators/OperatorWeakBinding.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java index 43a501b489..b1adb3d34d 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java @@ -52,7 +52,7 @@ private WeakSubscriber(Subscriber source) { @Override public void onCompleted() { - Subscriber sub = subscriberRef.get(); + final Subscriber sub = subscriberRef.get(); if (shouldForwardNotification(sub)) { sub.onCompleted(); } else { @@ -62,7 +62,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - Subscriber sub = subscriberRef.get(); + final Subscriber sub = subscriberRef.get(); if (shouldForwardNotification(sub)) { sub.onError(e); } else { @@ -72,7 +72,7 @@ public void onError(Throwable e) { @Override public void onNext(T t) { - Subscriber sub = subscriberRef.get(); + final Subscriber sub = subscriberRef.get(); if (shouldForwardNotification(sub)) { sub.onNext(t); } else { @@ -90,7 +90,8 @@ private void handleLostBinding(Subscriber sub, String context) { Log.d(LOG_TAG, "subscriber gone; skipping " + context); } else { final R r = boundRef.get(); - if (r != null) { // the predicate failed to validate + if (r != null) { + // the predicate failed to validate Log.d(LOG_TAG, "bound component has become invalid; skipping " + context); } else { Log.d(LOG_TAG, "bound component gone; skipping " + context); From 2f0670ecc6fdae7ce8cf30848e79a773f520934e Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Wed, 12 Mar 2014 18:31:12 +0100 Subject: [PATCH 4/6] Guard the log calls --- .../java/rx/operators/OperatorWeakBinding.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java index b1adb3d34d..986341396f 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java @@ -87,18 +87,24 @@ private boolean shouldForwardNotification(Subscriber sub) { private void handleLostBinding(Subscriber sub, String context) { if (sub == null) { - Log.d(LOG_TAG, "subscriber gone; skipping " + context); + log("subscriber gone; skipping " + context); } else { final R r = boundRef.get(); if (r != null) { // the predicate failed to validate - Log.d(LOG_TAG, "bound component has become invalid; skipping " + context); + log("bound component has become invalid; skipping " + context); } else { - Log.d(LOG_TAG, "bound component gone; skipping " + context); + log("bound component gone; skipping " + context); } } - Log.d(LOG_TAG, "unsubscribing..."); + log("unsubscribing..."); unsubscribe(); } + + private void log(String message) { + if (Log.isLoggable(LOG_TAG, Log.DEBUG)) { + Log.d(LOG_TAG, message); + } + } } } From 45afeb9fb5ce49a9f0e4d931057ebaf6990111ab Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Thu, 13 Mar 2014 11:14:25 +0100 Subject: [PATCH 5/6] Forward subscription of wrapped subscriber --- .../rx/operators/OperatorWeakBinding.java | 1 + .../rx/operators/OperatorWeakBindingTest.java | 42 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java index 986341396f..3cc8258302 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java @@ -47,6 +47,7 @@ final class WeakSubscriber extends Subscriber { final WeakReference> subscriberRef; private WeakSubscriber(Subscriber source) { + super(source); subscriberRef = new WeakReference>(source); } diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java index e5445ce228..6dd72c606d 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java @@ -1,24 +1,21 @@ package rx.operators; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.robolectric.RobolectricTestRunner; -import rx.Subscriber; import rx.functions.Functions; +import rx.observers.TestSubscriber; + +import java.util.Arrays; @RunWith(RobolectricTestRunner.class) public class OperatorWeakBindingTest { - @Mock - private Subscriber subscriber; + private TestSubscriber subscriber = new TestSubscriber(); @Before public void setUp() throws Exception { @@ -34,10 +31,9 @@ public void shouldForwardAllNotificationsWhenSubscriberAndTargetAlive() { weakSub.onCompleted(); weakSub.onError(new Exception()); - verify(subscriber).onNext("one"); - verify(subscriber).onNext("two"); - verify(subscriber).onCompleted(); - verify(subscriber).onError(any(Exception.class)); + subscriber.assertReceivedOnNext(Arrays.asList("one", "two")); + assertEquals(1, subscriber.getOnCompletedEvents().size()); + assertEquals(1, subscriber.getOnErrorEvents().size()); } @Test @@ -51,8 +47,9 @@ public void shouldUnsubscribeFromSourceSequenceWhenSubscriberReleased() { weakSub.onCompleted(); weakSub.onError(new Exception()); - verify(subscriber).onNext("one"); - verifyNoMoreInteractions(subscriber); + subscriber.assertReceivedOnNext(Arrays.asList("one")); + assertEquals(0, subscriber.getOnCompletedEvents().size()); + assertEquals(0, subscriber.getOnErrorEvents().size()); } @Test @@ -66,8 +63,9 @@ public void shouldUnsubscribeFromSourceSequenceWhenTargetObjectReleased() { weakSub.onCompleted(); weakSub.onError(new Exception()); - verify(subscriber).onNext("one"); - verifyNoMoreInteractions(subscriber); + subscriber.assertReceivedOnNext(Arrays.asList("one")); + assertEquals(0, subscriber.getOnCompletedEvents().size()); + assertEquals(0, subscriber.getOnErrorEvents().size()); } @Test @@ -81,6 +79,16 @@ public void shouldUnsubscribeFromSourceSequenceWhenPredicateFailsToPass() { weakSub.onCompleted(); weakSub.onError(new Exception()); - verifyZeroInteractions(subscriber); + assertEquals(0, subscriber.getOnNextEvents().size()); + assertEquals(0, subscriber.getOnCompletedEvents().size()); + assertEquals(0, subscriber.getOnErrorEvents().size()); + } + + @Test + public void unsubscribeWillUnsubscribeFromWrappedSubscriber() { + OperatorWeakBinding op = new OperatorWeakBinding(new Object()); + + op.call(subscriber).unsubscribe(); + subscriber.assertUnsubscribed(); } } From 7ba5be9797ad17bdcbc34307049fdfc1f33ac8f3 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Thu, 13 Mar 2014 11:18:01 +0100 Subject: [PATCH 6/6] Rewrite AndroidObservableTest to test the new operator --- .../java/rx/android/observables/AndroidObservable.java | 4 +++- .../rx/android/observables/AndroidObservableTest.java | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java index 28d8d704aa..46d0e48285 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java @@ -141,6 +141,7 @@ public static Observable fromFragment(Object fragment, Observable sour * @param source the source sequence */ public static Observable bindActivity(Activity activity, Observable source) { + Assertions.assertUiThread(); return source.observeOn(mainThread()).lift(new OperatorWeakBinding(activity, ACTIVITY_VALIDATOR)); } @@ -155,7 +156,8 @@ public static Observable bindActivity(Activity activity, Observable so * @param source the source sequence */ public static Observable bindFragment(Object fragment, Observable cachedSequence) { - Observable source = cachedSequence.observeOn(mainThread()); + Assertions.assertUiThread(); + final Observable source = cachedSequence.observeOn(mainThread()); if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) { android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment; return source.lift(new OperatorWeakBinding(f, FRAGMENTV4_VALIDATOR)); diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java index 0591c6454e..fe7263da0f 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java @@ -69,21 +69,21 @@ public void setup() { @Test public void itSupportsFragmentsFromTheSupportV4Library() { - AndroidObservable.fromFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver(observer)); + AndroidObservable.bindFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver(observer)); verify(observer).onNext("success"); verify(observer).onCompleted(); } @Test public void itSupportsNativeFragments() { - AndroidObservable.fromFragment(fragment, Observable.just("success")).subscribe(new TestObserver(observer)); + AndroidObservable.bindFragment(fragment, Observable.just("success")).subscribe(new TestObserver(observer)); verify(observer).onNext("success"); verify(observer).onCompleted(); } @Test(expected = IllegalArgumentException.class) public void itThrowsIfObjectPassedIsNotAFragment() { - AndroidObservable.fromFragment("not a fragment", Observable.never()); + AndroidObservable.bindFragment("not a fragment", Observable.never()); } @Test(expected = IllegalStateException.class) @@ -91,7 +91,7 @@ public void itThrowsIfObserverCallsFromFragmentFromBackgroundThread() throws Thr final Future future = Executors.newSingleThreadExecutor().submit(new Callable() { @Override public Object call() throws Exception { - AndroidObservable.fromFragment(fragment, Observable.empty()); + AndroidObservable.bindFragment(fragment, Observable.empty()); return null; } }); @@ -107,7 +107,7 @@ public void itThrowsIfObserverCallsFromActivityFromBackgroundThread() throws Thr final Future future = Executors.newSingleThreadExecutor().submit(new Callable() { @Override public Object call() throws Exception { - AndroidObservable.fromActivity(activity, Observable.empty()); + AndroidObservable.bindActivity(activity, Observable.empty()); return null; } });