Skip to content

Commit

Permalink
add subscribeOn and doOnSubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
leeowenowen committed Apr 19, 2016
1 parent df842fa commit e57a3cc
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,28 @@ public void call(Integer integer) {
});
}
});
registery.add(Constants.Utility.subscribeOn, new Runnable() {
@Override
public void run() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onNext(1);
subscriber.onCompleted();
log("here in: " + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("" + integer + " on " + Thread.currentThread().getName());
}
});
}
});
registery.add(Constants.Utility.doOnEach, new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -164,15 +186,26 @@ public void call(Integer integer) {
});
}
});
registery.add(Constants.Utility.doOnUnsubscribe, new Runnable() {
registery.add(Constants.Utility.doOnSubscribe, new Runnable() {
@Override
public void run() {
Subscription subscription = Observable.just(1, 2).doOnSubscribe(new Action0() {
Observable.just(1, 2).doOnSubscribe(new Action0() {
@Override
public void call() {
log("OnSubscribe");
}
}).doOnUnsubscribe(new Action0() {
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
registery.add(Constants.Utility.doOnUnsubscribe, new Runnable() {
@Override
public void run() {
Subscription subscription = Observable.just(1, 2).doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("OnUnSubscribe");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ public static final class Utility {
public static final String serialize = "serialize";
public static final String cache = "cache";
public static final String observeOn = "observeOn";
public static final String subscribeOn = "subscribeOn";
public static final String doOnEach = "doOnEach";
public static final String doOnCompleted = "doOnCompleted";
public static final String doOnError = "doOnError";
public static final String doOnTerminate = "doOnTerminate";
public static final String doOnSubscribe = "doOnSubscribe";

public static final String doOnUnsubscribe = "doOnUnsubscribe";
public static final String finallyDo = "finallyDo";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ private void load() {
add(Constants.Utility.serialize, R.string.desc_utility_serialize);
add(Constants.Utility.cache, R.string.desc_utility_cache);
add(Constants.Utility.observeOn, R.string.desc_utility_observeOn);
add(Constants.Utility.subscribeOn, R.string.desc_utility_subscribeOn);
add(Constants.Utility.doOnEach, R.string.desc_utility_doOnEach);
add(Constants.Utility.doOnCompleted, R.string.desc_utility_doOnCompleted);
add(Constants.Utility.doOnError, R.string.desc_utility_doOnError);
add(Constants.Utility.doOnTerminate, R.string.desc_utility_doOnTerminate);
add(Constants.Utility.doOnSubscribe, R.string.desc_utility_doOnSubscribe);
add(Constants.Utility.doOnUnsubscribe, R.string.desc_utility_doOnUnsubscribe);
add(Constants.Utility.finallyDo, R.string.desc_utility_finallyDo);
add(Constants.Utility.delay, R.string.desc_utility_delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ private void load() {
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png");
add(Constants.Utility.observeOn,
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png");
add(Constants.Utility.subscribeOn,
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png");
add(Constants.Utility.doOnEach,
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png");
add(Constants.Utility.doOnCompleted,
Expand All @@ -106,6 +108,8 @@ private void load() {
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png");
add(Constants.Utility.doOnTerminate,
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png");
add(Constants.Utility.doOnSubscribe,
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnSubscribe.png");
add(Constants.Utility.doOnUnsubscribe,
"https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnUnsubscribe.png");
add(Constants.Utility.finallyDo,
Expand Down
12 changes: 6 additions & 6 deletions app/src/main/res/values-en/strings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<string name="desc_utility_doOnUnsubscribe"> register an action to take when an observer subscribes to an Observable</string>
<string name="desc_utility_finallyDo"> register an action to take when an Observable completes</string>
<string name="desc_utility_delay"> shift the emissions from an Observable forward in time by a specified amount</string>
<string name="desc_utility_delaySubscription"> hold an Subscriber's subscription request for a specified amount of time before passing it on to the source Observable</string>
<string name="desc_utility_delaySubscription"> hold an Subscriber\'s subscription request for a specified amount of time before passing it on to the source Observable</string>
<string name="desc_utility_timeInterval"> emit the time lapsed between consecutive emissions of a source Observable</string>
<string name="desc_utility_using">create a disposable resource that has the same lifespan as an Observable</string>
<string name="desc_utility_single"> if the Observable completes after emitting a single item, return that item, otherwise throw an exception</string>
Expand Down Expand Up @@ -147,15 +147,15 @@

<string name="desc_condition_amb"> given two or more source Observables, emits all of the items from the first of these Observables to emit an item</string>
<string name="desc_condition_defaultIfEmpty">emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items</string>
<string name="desc_condition_doWhile">emit the source Observable's sequence, and then repeat the sequence as long as a condition remains true</string>
<string name="desc_condition_ifThen">only emit the source Observable's sequence if a condition is true, otherwise emit an empty or default sequence</string>
<string name="desc_condition_skipUtil">discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable's items</string>
<string name="desc_condition_doWhile">emit the source Observable\'s sequence, and then repeat the sequence as long as a condition remains true</string>
<string name="desc_condition_ifThen">only emit the source Observable\'s sequence if a condition is true, otherwise emit an empty or default sequence</string>
<string name="desc_condition_skipUtil">discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable\'s items</string>
<string name="desc_condition_skipWhile"> discard items emitted by an Observable until a specified condition is false, then emit the remainder</string>
<string name="desc_condition_switchcase"> emit the sequence from a particular Observable based on the results of an evaluation</string>
<string name="desc_condition_takeUntil">emits the items from the source Observable until a second Observable emits an item or issues a notification</string>
<string name="desc_condition_takeWhile">emit items emitted by an Observable as long as a specified condition is true, then skip the remainder</string>
<string name="desc_condition_takeWhileWithIndex">emit items emitted by an Observable as long as a specified condition is true, then skip the remainder</string>
<string name="desc_condition_WhileDo">if a condition is true, emit the source Observable's sequence and then repeat the sequence as long as the condition remains true</string>
<string name="desc_condition_WhileDo">if a condition is true, emit the source Observable\'s sequence and then repeat the sequence as long as the condition remains true</string>
<string name="desc_combine_startWith">emit a specified sequence of items before beginning to emit the items from the Observable</string>
<string name="desc_combine_merge">combine multiple Observables into one</string>
<string name="desc_combine_mergeDelayError">combine multiple Observables into one, allowing error-free Observables to continue before propagating errors</string>
Expand Down Expand Up @@ -184,7 +184,7 @@

<string name="desc_async_start"> create an Observable that emits the return value of a function</string>
<string name="desc_async_toAsync">convert a function or Action into an Observable that executes the function and emits its return value</string>
<string name="desc_async_startFuture">convert a function that returns Future into an Observable that emits that Future's return value</string>
<string name="desc_async_startFuture">convert a function that returns Future into an Observable that emits that Future\'s return value</string>
<string name="desc_async_deferFuture">convert a Future that returns an Observable into an Observable, but do not attempt to get the Observable that the Future returns until a Subscriber subscribes</string>
<string name="desc_async_forEachFuture">pass Subscriber methods to an Observable but also have it behave like a Future that blocks until it completes</string>
<string name="desc_async_fromAction"> convert an Action into an Observable that invokes the action and emits its result when a Subscriber subscribes</string>
Expand Down
2 changes: 2 additions & 0 deletions app/src/main/res/values-zh/strings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
<string name="desc_utility_serialize"></string>
<string name="desc_utility_cache"></string>
<string name="desc_utility_observeOn"></string>
<string name="desc_utility_subscribeOn"></string>
<string name="desc_utility_doOnEach"></string>
<string name="desc_utility_doOnCompleted"></string>
<string name="desc_utility_doOnError"></string>
<string name="desc_utility_doOnTerminate"></string>
<string name="desc_utility_doOnSubscribe"></string>
<string name="desc_utility_doOnUnsubscribe"></string>
<string name="desc_utility_finallyDo"></string>
<string name="desc_utility_delay"></string>
Expand Down

0 comments on commit e57a3cc

Please sign in to comment.