Skip to content

Commit

Permalink
Merge pull request #938 from soundcloud/operator-weak-binding
Browse files Browse the repository at this point in the history
OperatorWeakBinding (deprecates OperatorObserveFromAndroidComponent)
  • Loading branch information
benjchristensen committed Mar 13, 2014
2 parents 557e18a + 7ba5be9 commit 90d5978
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package rx.android.observables;

import static rx.android.schedulers.AndroidSchedulers.mainThread;

import rx.Observable;
import rx.functions.Func1;
import rx.operators.OperatorObserveFromAndroidComponent;
import rx.operators.OperatorWeakBinding;

import android.app.Activity;
import android.app.Fragment;
import android.os.Build;
Expand All @@ -36,7 +41,30 @@ public final class AndroidObservable {
USES_SUPPORT_FRAGMENTS = supportFragmentsAvailable;
}

private AndroidObservable() {}
private static final Func1<Activity, Boolean> ACTIVITY_VALIDATOR = new Func1<Activity, Boolean>() {
@Override
public Boolean call(Activity activity) {
return !activity.isFinishing();
}
};

private static final Func1<Fragment, Boolean> FRAGMENT_VALIDATOR = new Func1<Fragment, Boolean>() {
@Override
public Boolean call(Fragment fragment) {
return fragment.isAdded();
}
};

private static final Func1<android.support.v4.app.Fragment, Boolean> FRAGMENTV4_VALIDATOR =
new Func1<android.support.v4.app.Fragment, Boolean>() {
@Override
public Boolean call(android.support.v4.app.Fragment fragment) {
return fragment.isAdded();
}
};

private AndroidObservable() {
}

/**
* Transforms a source observable to be attached to the given Activity, in such a way that notifications will always
Expand All @@ -57,7 +85,9 @@ private AndroidObservable() {}
* @param sourceObservable the observable sequence to observe from the given Activity
* @param <T>
* @return a new observable sequence that will emit notifications on the main UI thread
* @deprecated Use {@link #bindActivity(android.app.Activity, rx.Observable)} instead
*/
@Deprecated
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
Assertions.assertUiThread();
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
Expand Down Expand Up @@ -86,7 +116,9 @@ public static <T> Observable<T> fromActivity(Activity activity, Observable<T> so
* @param sourceObservable the observable sequence to observe from the given fragment
* @param <T>
* @return a new observable sequence that will emit notifications on the main UI thread
* @deprecated Use {@link #bindFragment(Object, rx.Observable)} instead
*/
@Deprecated
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
Assertions.assertUiThread();
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
Expand All @@ -97,4 +129,43 @@ public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sour
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
}
}

/**
* Binds the given source sequence to the life-cycle of an activity.
* <p/>
* This helper will schedule the given sequence to be observed on the main UI thread and ensure
* that no notifications will be forwarded to the activity in case it gets destroyed by the Android runtime
* or garbage collected by the VM.
*
* @param activity the activity to bind the source sequence to
* @param source the source sequence
*/
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> source) {
Assertions.assertUiThread();
return source.observeOn(mainThread()).lift(new OperatorWeakBinding<T, Activity>(activity, ACTIVITY_VALIDATOR));
}

/**
* Binds the given source sequence to the life-cycle of a fragment (native or support-v4).
* <p/>
* This helper will schedule the given sequence to be observed on the main UI thread and ensure
* that no notifications will be forwarded to the fragment in case it gets detached from its
* activity or garbage collected by the VM.
*
* @param fragment the fragment to bind the source sequence to
* @param source the source sequence
*/
public static <T> Observable<T> bindFragment(Object fragment, Observable<T> cachedSequence) {
Assertions.assertUiThread();
final Observable<T> source = cachedSequence.observeOn(mainThread());
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment;
return source.lift(new OperatorWeakBinding<T, android.support.v4.app.Fragment>(f, FRAGMENTV4_VALIDATOR));
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
Fragment f = (Fragment) fragment;
return source.lift(new OperatorWeakBinding<T, Fragment>(f, FRAGMENT_VALIDATOR));
} else {
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import android.app.Activity;
import android.util.Log;

@Deprecated
public class OperatorObserveFromAndroidComponent {

public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package rx.operators;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Functions;

import android.util.Log;

import java.lang.ref.WeakReference;

/**
* Ties a source sequence to the life-cycle of the given target object, and/or the subscriber
* using weak references. When either object is gone, this operator automatically unsubscribes
* from the source sequence.
* <p/>
* You can also pass in an optional predicate function, which whenever it evaluates to false
* on the target object, will also result in the operator unsubscribing from the sequence.
*
* @param <T> the type of the objects emitted to a subscriber
* @param <R> the type of the target object to bind to
*/
public final class OperatorWeakBinding<T, R> implements Observable.Operator<T, T> {

private static final String LOG_TAG = "WeakBinding";

final WeakReference<R> boundRef;
private final Func1<? super R, Boolean> predicate;

public OperatorWeakBinding(R bound, Func1<? super R, Boolean> predicate) {
boundRef = new WeakReference<R>(bound);
this.predicate = predicate;
}

public OperatorWeakBinding(R bound) {
boundRef = new WeakReference<R>(bound);
this.predicate = Functions.alwaysTrue();
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new WeakSubscriber(child);
}

final class WeakSubscriber extends Subscriber<T> {

final WeakReference<Subscriber<? super T>> subscriberRef;

private WeakSubscriber(Subscriber<? super T> source) {
super(source);
subscriberRef = new WeakReference<Subscriber<? super T>>(source);
}

@Override
public void onCompleted() {
final Subscriber<? super T> sub = subscriberRef.get();
if (shouldForwardNotification(sub)) {
sub.onCompleted();
} else {
handleLostBinding(sub, "onCompleted");
}
}

@Override
public void onError(Throwable e) {
final Subscriber<? super T> sub = subscriberRef.get();
if (shouldForwardNotification(sub)) {
sub.onError(e);
} else {
handleLostBinding(sub, "onError");
}
}

@Override
public void onNext(T t) {
final Subscriber<? super T> sub = subscriberRef.get();
if (shouldForwardNotification(sub)) {
sub.onNext(t);
} else {
handleLostBinding(sub, "onNext");
}
}

private boolean shouldForwardNotification(Subscriber<? super T> sub) {
final R target = boundRef.get();
return sub != null && target != null && predicate.call(target);
}

private void handleLostBinding(Subscriber<? super T> sub, String context) {
if (sub == null) {
log("subscriber gone; skipping " + context);
} else {
final R r = boundRef.get();
if (r != null) {
// the predicate failed to validate
log("bound component has become invalid; skipping " + context);
} else {
log("bound component gone; skipping " + context);
}
}
log("unsubscribing...");
unsubscribe();
}

private void log(String message) {
if (Log.isLoggable(LOG_TAG, Log.DEBUG)) {
Log.d(LOG_TAG, message);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,29 @@ public void setup() {

@Test
public void itSupportsFragmentsFromTheSupportV4Library() {
AndroidObservable.fromFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
AndroidObservable.bindFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
verify(observer).onNext("success");
verify(observer).onCompleted();
}

@Test
public void itSupportsNativeFragments() {
AndroidObservable.fromFragment(fragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
AndroidObservable.bindFragment(fragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
verify(observer).onNext("success");
verify(observer).onCompleted();
}

@Test(expected = IllegalArgumentException.class)
public void itThrowsIfObjectPassedIsNotAFragment() {
AndroidObservable.fromFragment("not a fragment", Observable.never());
AndroidObservable.bindFragment("not a fragment", Observable.never());
}

@Test(expected = IllegalStateException.class)
public void itThrowsIfObserverCallsFromFragmentFromBackgroundThread() throws Throwable {
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
AndroidObservable.fromFragment(fragment, Observable.empty());
AndroidObservable.bindFragment(fragment, Observable.empty());
return null;
}
});
Expand All @@ -107,7 +107,7 @@ public void itThrowsIfObserverCallsFromActivityFromBackgroundThread() throws Thr
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
AndroidObservable.fromActivity(activity, Observable.empty());
AndroidObservable.bindActivity(activity, Observable.empty());
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package rx.operators;

import static org.junit.Assert.assertEquals;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.robolectric.RobolectricTestRunner;
import rx.functions.Functions;
import rx.observers.TestSubscriber;

import java.util.Arrays;

@RunWith(RobolectricTestRunner.class)
public class OperatorWeakBindingTest {

private TestSubscriber<String> subscriber = new TestSubscriber<String>();

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}

@Test
public void shouldForwardAllNotificationsWhenSubscriberAndTargetAlive() {
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());
OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
weakSub.onNext("one");
weakSub.onNext("two");
weakSub.onCompleted();
weakSub.onError(new Exception());

subscriber.assertReceivedOnNext(Arrays.asList("one", "two"));
assertEquals(1, subscriber.getOnCompletedEvents().size());
assertEquals(1, subscriber.getOnErrorEvents().size());
}

@Test
public void shouldUnsubscribeFromSourceSequenceWhenSubscriberReleased() {
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());

OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
weakSub.onNext("one");
weakSub.subscriberRef.clear();
weakSub.onNext("two");
weakSub.onCompleted();
weakSub.onError(new Exception());

subscriber.assertReceivedOnNext(Arrays.asList("one"));
assertEquals(0, subscriber.getOnCompletedEvents().size());
assertEquals(0, subscriber.getOnErrorEvents().size());
}

@Test
public void shouldUnsubscribeFromSourceSequenceWhenTargetObjectReleased() {
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());

OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
weakSub.onNext("one");
op.boundRef.clear();
weakSub.onNext("two");
weakSub.onCompleted();
weakSub.onError(new Exception());

subscriber.assertReceivedOnNext(Arrays.asList("one"));
assertEquals(0, subscriber.getOnCompletedEvents().size());
assertEquals(0, subscriber.getOnErrorEvents().size());
}

@Test
public void shouldUnsubscribeFromSourceSequenceWhenPredicateFailsToPass() {
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(
new Object(), Functions.alwaysFalse());

OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber);
weakSub.onNext("one");
weakSub.onNext("two");
weakSub.onCompleted();
weakSub.onError(new Exception());

assertEquals(0, subscriber.getOnNextEvents().size());
assertEquals(0, subscriber.getOnCompletedEvents().size());
assertEquals(0, subscriber.getOnErrorEvents().size());
}

@Test
public void unsubscribeWillUnsubscribeFromWrappedSubscriber() {
OperatorWeakBinding<String, Object> op = new OperatorWeakBinding<String, Object>(new Object());

op.call(subscriber).unsubscribe();
subscriber.assertUnsubscribed();
}
}

0 comments on commit 90d5978

Please sign in to comment.