Skip to content

Commit

Permalink
Added Single versions of Transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
dlew committed May 5, 2016
1 parent 1721d9a commit 2544929
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.trello.rxlifecycle;

import rx.Observable;
import rx.Single;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.functions.Func2;

/**
* Continues a subscription until it sees a particular lifecycle event.
*
* That lifecycle event is determined based on what stage we're at in
* the current lifecycle.
*/
class UntilCorrespondingEventSingleTransformer<T, R> implements Single.Transformer<T, T> {

final Observable<R> sharedLifecycle;
final Func1<R, R> correspondingEvents;

public UntilCorrespondingEventSingleTransformer(Observable<R> lifecycle, Func1<R, R> correspondingEvents) {
this.sharedLifecycle = lifecycle.share(); // Share so that we always compare identical lifecycles
this.correspondingEvents = correspondingEvents;
}

@Override
public Single<T> call(Single<T> source) {
return source.takeUntil(
Observable.combineLatest(
this.sharedLifecycle.take(1).map(correspondingEvents),
this.sharedLifecycle.skip(1),
new Func2<R, R, Boolean>() {
@Override
public Boolean call(R bindUntilEvent, R lifecycleEvent) {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(RESUME_FUNCTION)
.takeFirst(SHOULD_COMPLETE)
);
}

private static final Func1<Throwable, Boolean> RESUME_FUNCTION = new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable throwable) {
if (throwable instanceof OutsideLifecycleException) {
return true;
}

Exceptions.propagate(throwable);
return false;
}
};

private static final Func1<Boolean, Boolean> SHOULD_COMPLETE = new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean shouldComplete) {
return shouldComplete;
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.trello.rxlifecycle;

import rx.Observable;
import rx.Single;
import rx.functions.Func1;

/**
* Continues a subscription until it sees a particular lifecycle event.
*/
class UntilEventSingleTransformer<T, R> implements Single.Transformer<T, T> {

final Observable<R> lifecycle;
final R event;

public UntilEventSingleTransformer(Observable<R> lifecycle, R event) {
this.lifecycle = lifecycle;
this.event = event;
}

@Override
public Single<T> call(Single<T> source) {
return source.takeUntil(
lifecycle.takeFirst(new Func1<R, Boolean>() {
@Override
public Boolean call(R lifecycleEvent) {
return lifecycleEvent.equals(event);
}
})
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.trello.rxlifecycle;

import rx.Observable;
import rx.Single;

/**
* Continues a subscription until it sees *any* lifecycle event.
*/
class UntilLifecycleSingleTransformer<T, R> implements Single.Transformer<T, T> {

final Observable<R> lifecycle;

public UntilLifecycleSingleTransformer(Observable<R> lifecycle) {
this.lifecycle = lifecycle;
}

@Override
public Single<T> call(Single<T> source) {
return source.takeUntil(lifecycle);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.trello.rxlifecycle;

import org.junit.Before;
import org.junit.Test;
import rx.Single;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

import java.util.concurrent.CancellationException;

public class UntilCorrespondingEventSingleTransformerTest {

PublishSubject<String> lifecycle;
TestSubscriber<String> testSubscriber;

@Before
public void setup() {
lifecycle = PublishSubject.create();
testSubscriber = new TestSubscriber<>(0);
}

@Test
public void noEvents() {
Single.just("1")
.compose(new UntilCorrespondingEventSingleTransformer<String, String>(lifecycle, CORRESPONDING_EVENTS))
.subscribe(testSubscriber);

testSubscriber.requestMore(1);
testSubscriber.assertValue("1");
testSubscriber.assertCompleted();
}

@Test
public void oneStartEvent() {
Single.just("1")
.compose(new UntilCorrespondingEventSingleTransformer<String, String>(lifecycle, CORRESPONDING_EVENTS))
.subscribe(testSubscriber);

lifecycle.onNext("create");
testSubscriber.requestMore(1);
testSubscriber.assertValue("1");
testSubscriber.assertCompleted();
}

@Test
public void twoOpenEvents() {
Single.just("1")
.compose(new UntilCorrespondingEventSingleTransformer<String, String>(lifecycle, CORRESPONDING_EVENTS))
.subscribe(testSubscriber);

lifecycle.onNext("create");
lifecycle.onNext("start");
testSubscriber.requestMore(1);
testSubscriber.assertValue("1");
testSubscriber.assertCompleted();
}

@Test
public void openAndCloseEvent() {
Single.just("1")
.compose(new UntilCorrespondingEventSingleTransformer<String, String>(lifecycle, CORRESPONDING_EVENTS))
.subscribe(testSubscriber);

lifecycle.onNext("create");
lifecycle.onNext("destroy");
testSubscriber.requestMore(1);
testSubscriber.assertNoValues();
testSubscriber.assertError(CancellationException.class);
}

private static final Func1<String, String> CORRESPONDING_EVENTS = new Func1<String, String>() {
@Override
public String call(String s) {
if (s.equals("create")) {
return "destroy";
}

throw new IllegalArgumentException("Cannot handle: " + s);
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.trello.rxlifecycle;

import org.junit.Before;
import org.junit.Test;
import rx.Single;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

import java.util.concurrent.CancellationException;

public class UntilEventSingleTransformerTest {

PublishSubject<String> lifecycle;
TestSubscriber<String> testSubscriber;

@Before
public void setup() {
lifecycle = PublishSubject.create();
testSubscriber = new TestSubscriber<>(0);
}

@Test
public void noEvents() {
Single.just("1")
.compose(new UntilEventSingleTransformer<String, String>(lifecycle, "stop"))
.subscribe(testSubscriber);

testSubscriber.requestMore(1);
testSubscriber.assertValue("1");
testSubscriber.assertCompleted();
}

@Test
public void oneWrongEvent() {
Single.just("1")
.compose(new UntilEventSingleTransformer<String, String>(lifecycle, "stop"))
.subscribe(testSubscriber);

lifecycle.onNext("keep going");
testSubscriber.requestMore(1);

testSubscriber.assertValue("1");
testSubscriber.assertCompleted();
}

@Test
public void twoEvents() {
Single.just("1")
.compose(new UntilEventSingleTransformer<String, String>(lifecycle, "stop"))
.subscribe(testSubscriber);

lifecycle.onNext("keep going");
lifecycle.onNext("stop");
testSubscriber.requestMore(1);

testSubscriber.assertNoValues();
testSubscriber.assertError(CancellationException.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.trello.rxlifecycle;

import org.junit.Before;
import org.junit.Test;
import rx.Single;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

import java.util.concurrent.CancellationException;

public class UntilLifecycleSingleTransformerTest {

PublishSubject<String> lifecycle;
TestSubscriber<String> testSubscriber;

@Before
public void setup() {
lifecycle = PublishSubject.create();
testSubscriber = new TestSubscriber<>(0);
}

@Test
public void noEvent() {
Single.just("1")
.compose(new UntilLifecycleSingleTransformer<String, String>(lifecycle))
.subscribe(testSubscriber);

testSubscriber.requestMore(1);
testSubscriber.assertValue("1");
testSubscriber.assertCompleted();
}

@Test
public void oneEvent() {
Single.just("1")
.compose(new UntilLifecycleSingleTransformer<String, String>(lifecycle))
.subscribe(testSubscriber);

lifecycle.onNext("stop");
testSubscriber.requestMore(1);

testSubscriber.assertNoValues();
testSubscriber.assertError(CancellationException.class);
}
}

0 comments on commit 2544929

Please sign in to comment.