Skip to content

Commit

Permalink
Merge pull request #962 from benjchristensen/serialize-synchronize
Browse files Browse the repository at this point in the history
Migrate from SynchronizedObserver to SerializedObserver
  • Loading branch information
benjchristensen committed Mar 13, 2014
2 parents fc2e45f + 6926fa6 commit 4834d85
Show file tree
Hide file tree
Showing 22 changed files with 2,896 additions and 150 deletions.
19 changes: 13 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperatorAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
Expand Down Expand Up @@ -91,10 +90,8 @@
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
import rx.operators.OperatorSkipWhile;
import rx.operators.OperationSum;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeTimed;
import rx.operators.OperationTakeUntil;
Expand All @@ -107,6 +104,7 @@
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorAmb;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFilter;
Expand All @@ -120,8 +118,11 @@
import rx.operators.OperatorRepeat;
import rx.operators.OperatorRetry;
import rx.operators.OperatorScan;
import rx.operators.OperatorSerialize;
import rx.operators.OperatorSkip;
import rx.operators.OperatorSkipWhile;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorSynchronize;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
import rx.operators.OperatorTimeoutWithSelector;
Expand Down Expand Up @@ -2712,7 +2713,7 @@ public final static <T> Observable<T> switchOnNext(Observable<? extends Observab
*/
@Deprecated
public final static <T> Observable<T> synchronize(Observable<T> source) {
return create(OperationSynchronize.synchronize(source));
return source.synchronize();
}

/**
Expand Down Expand Up @@ -6197,6 +6198,10 @@ public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accum
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

public final Observable<T> serialize() {
return lift(new OperatorSerialize<T>());
}

/**
* If the source Observable completes after emitting a single item, return an Observable that emits that
* item. If the source Observable emits more than one item or no items, throw an
Expand Down Expand Up @@ -7259,9 +7264,10 @@ public final <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<?
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
* synchronously notifies its {@link Observer}s
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
*/
public final Observable<T> synchronize() {
return create(OperationSynchronize.synchronize(this));
return lift(new OperatorSynchronize<T>());
}

/**
Expand All @@ -7283,9 +7289,10 @@ public final Observable<T> synchronize() {
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
* synchronously notifies its {@link Observer}s
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
*/
public final Observable<T> synchronize(Object lock) {
return create(OperationSynchronize.synchronize(this, lock));
return lift(new OperatorSynchronize<T>(lock));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/observers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* <li>When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).</li>
* </ul>
* <p>
* It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that.
* It will not synchronize onNext execution. Use the {@link SerializedSubscriber} to do that.
*
* @param <T>
*/
Expand Down
186 changes: 186 additions & 0 deletions rxjava-core/src/main/java/rx/observers/SerializedObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package rx.observers;

import java.util.ArrayList;

import rx.Observer;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
* <p>
* When multiple threads are notifying they will be serialized by:
* <p>
* <li>Allowing only one thread at a time to emit</li>
* <li>Adding notifications to a queue if another thread is already emitting</li>
* <li>Not holding any locks or blocking any threads while emitting</li>
* <p>
*
* @param <T>
*/
public class SerializedObserver<T> implements Observer<T> {
private final Observer<? super T> actual;

private boolean emitting = false;
private boolean terminated = false;
private ArrayList<Object> queue = new ArrayList<Object>();

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class Sentinel {

}

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}

public SerializedObserver(Observer<? super T> s) {
this.actual = s;
}

@Override
public void onCompleted() {
boolean canEmit = false;
ArrayList<Object> list = null;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it
queue.add(COMPLETE_SENTINEL);
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
actual.onCompleted();
} finally {
synchronized (this) {
emitting = false;
}
}
}
}

@Override
public void onError(final Throwable e) {
boolean canEmit = false;
ArrayList<Object> list = null;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
queue.clear();
queue.add(new ErrorSentinel(e));
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
actual.onError(e);
} finally {
synchronized (this) {
emitting = false;
}
}
}
}

@Override
public void onNext(T t) {
boolean canEmit = false;
ArrayList<Object> list = null;
synchronized (this) {
if (terminated) {
return;
}
if (!emitting) {
// emit immediately
emitting = true;
canEmit = true;
if (queue.size() > 0) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
}
} else {
// someone else is already emitting so just queue it
if (t == null) {
queue.add(NULL_SENTINEL);
} else {
queue.add(t);
}
}
}
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
actual.onNext(t);
} finally {
synchronized (this) {
if (terminated) {
list = queue; // copy reference
queue = new ArrayList<Object>(); // new version;
} else {
// release this thread
emitting = false;
canEmit = false;
}
}
}
}

// if terminated this will still be true so let's drain the rest of the queue
if (canEmit) {
drainQueue(list);
}
}

public void drainQueue(ArrayList<Object> list) {
if (list == null || list.size() == 0) {
return;
}
for (Object v : list) {
if (v != null) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else if (v instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel) v).e);
}
} else {
actual.onNext((T) v);
}
}
}
}
}
40 changes: 40 additions & 0 deletions rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
* <p>
* When multiple threads are notifying they will be serialized by:
* <p>
* <li>Allowing only one thread at a time to emit</li>
* <li>Adding notifications to a queue if another thread is already emitting</li>
* <li>Not holding any locks or blocking any threads while emitting</li>
* <p>
*
* @param <T>
*/
public class SerializedSubscriber<T> extends Subscriber<T> {

private final Observer<T> s;

public SerializedSubscriber(Subscriber<? super T> s) {
this.s = new SerializedObserver<T>(s);
}

@Override
public void onCompleted() {
s.onCompleted();
}

@Override
public void onError(Throwable e) {
s.onError(e);
}

@Override
public void onNext(T t) {
s.onNext(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
*
* @param <T>
* @deprecated Use SerializedObserver instead as it doesn't block threads during event notification.
*/
@Deprecated
public final class SynchronizedObserver<T> implements Observer<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
* </ul>
*
* @param <T>
* @deprecated Use SerializedSubscriber instead as it doesn't block threads during event notification.
*/
@Deprecated
public final class SynchronizedSubscriber<T> extends Subscriber<T> {

private final Observer<? super T> observer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SerializedObserver;
import rx.observers.SynchronizedObserver;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
Expand Down Expand Up @@ -111,7 +112,7 @@ private static class DebounceObserver<T> implements Observer<T> {
public DebounceObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
// we need to synchronize the observer since the on* events can be coming from different
// threads and are thus non-deterministic and could be interleaved
this.observer = new SynchronizedObserver<T>(observer);
this.observer = new SerializedObserver<T>(observer);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import rx.Observer;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.observers.SynchronizedObserver;
import rx.observers.SerializedObserver;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.CompositeSubscription;

Expand Down Expand Up @@ -141,15 +141,7 @@ private MergeDelayErrorObservable(Observable<? extends Observable<? extends T>>

public Subscription onSubscribe(Observer<? super T> actualObserver) {
CompositeSubscription completeSubscription = new CompositeSubscription();

/**
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
* <p>
* The calls from each sequence must be serialized.
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/614
*/
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver);
SerializedObserver<T> synchronizedObserver = new SerializedObserver<T>(actualObserver);

/**
* Subscribe to the parent Observable to get to the children Observables
Expand Down
Loading

0 comments on commit 4834d85

Please sign in to comment.