Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed deadlock in Subjects + OperatorCache. #972

Merged
merged 1 commit into from
Apr 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
Expand Down Expand Up @@ -105,6 +104,7 @@
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorAmb;
import rx.operators.OperatorCache;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFilter;
Expand Down Expand Up @@ -3863,7 +3863,7 @@ public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialC
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-cache">RxJava Wiki: cache()</a>
*/
public final Observable<T> cache() {
return create(OperationCache.cache(this));
return create(new OperatorCache<T>(this));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

/**
* This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes
Expand All @@ -35,30 +38,35 @@
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be
* careful not to use this operator on Observables that emit infinite or very large numbers of
* items, as this will use up memory.
*
* @param <T> the cached value type
*/
public class OperationCache {

public static <T> OnSubscribeFunc<T> cache(final Observable<? extends T> source) {
return new OnSubscribeFunc<T>() {

final AtomicBoolean subscribed = new AtomicBoolean(false);
private final ReplaySubject<T> cache = ReplaySubject.create();
public final class OperatorCache<T> implements OnSubscribe<T> {
protected final Observable<? extends T> source;
protected final Subject<? super T, ? extends T> cache;
protected final AtomicBoolean sourceSubscribed;

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
if (subscribed.compareAndSet(false, true)) {
// subscribe to the source once
source.subscribe(cache);
/*
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
*
* This means this should never be used on an infinite or very large sequence, similar to toList().
*/
}

return cache.subscribe(observer);
}
public OperatorCache(Observable<? extends T> source) {
this(source, ReplaySubject.<T>create());
}

/** Test support. */
public OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
this.source = source;
this.cache = cache;
this.sourceSubscribed = new AtomicBoolean();
}

};
@Override
public void call(Subscriber<? super T> t1) {
if (sourceSubscribed.compareAndSet(false, true)) {
source.subscribe(cache);
/*
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
*
* This means this should never be used on an infinite or very large sequence, similar to toList().
*/
}
cache.subscribe(t1);
}
}
23 changes: 14 additions & 9 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,35 +105,40 @@ protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T>

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
}
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
}
}
}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>(e));
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
}
lastNotification.set(Notification.<T>createOnError(e));
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
emitValueToObserver(lastNotification.get(), o);
}
}

}

@Override
public void onNext(T v) {
lastNotification.set(new Notification<T>(v));
lastNotification.set(Notification.createOnNext(v));
}

}
25 changes: 15 additions & 10 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,39 +144,44 @@ protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>());
for (Observer<? super T> o : observers) {
o.onCompleted();
}
lastNotification.set(Notification.<T>createOnCompleted());
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
o.onCompleted();
}
}
}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(new Notification<T>(e));
for (Observer<? super T> o : observers) {
o.onError(e);
}
lastNotification.set(Notification.<T>createOnError(e));
}
});

if (observers != null) {
for (Observer<? super T> o : observers) {
o.onError(e);
}
}
}

@Override
public void onNext(T v) {
// do not overwrite a terminal notification
// so new subscribers can get them
if (lastNotification.get().isOnNext()) {
lastNotification.set(new Notification<T>(v));
lastNotification.set(Notification.createOnNext(v));
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
o.onNext(v);
}
Expand Down
20 changes: 12 additions & 8 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,31 +95,35 @@ protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(Notification.<T> createOnCompleted());
for (Observer<? super T> o : observers) {
o.onCompleted();
}
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
o.onCompleted();
}
}
}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
lastNotification.set(Notification.<T>createOnError(e));
for (Observer<? super T> o : observers) {
o.onError(e);
}
}
});

if (observers != null) {
for (Observer<? super T> o : observers) {
o.onError(e);
}
}
}

@Override
Expand Down
26 changes: 16 additions & 10 deletions rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,34 +122,40 @@ protected ReplaySubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
state.history.complete(Notification.<T>createOnCompleted());
for (SubjectObserver<? super T> o : observers) {
if (caughtUp(o)) {
o.onCompleted();
}
}
}
});
if (observers != null) {
for (SubjectObserver<? super T> o : observers) {
if (caughtUp(o)) {
o.onCompleted();
}
}
}
}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers =
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {

@Override
public void call(Collection<SubjectObserver<? super T>> observers) {
state.history.complete(Notification.<T>createOnError(e));
for (SubjectObserver<? super T> o : observers) {
if (caughtUp(o)) {
o.onError(e);
}
}
}
});
if (observers != null) {
for (SubjectObserver<? super T> o : observers) {
if (caughtUp(o)) {
o.onError(e);
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,20 @@ public void call() {
}

@SuppressWarnings({ "unchecked", "rawtypes" })
protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
protected Collection<SubjectObserver<? super T>> terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
State<T> current;
State<T> newState = null;
do {
current = state.get();
if (current.terminated) {
// already terminated so do nothing
return;
return null;
} else {
newState = current.terminate();
}
} while (!state.compareAndSet(current, newState));

Collection<SubjectObserver<? super T>> observerCollection = (Collection)Arrays.asList(newState.observers);
/*
* if we get here then we won setting the state to terminated
* and have a deterministic set of Observers to emit to (concurrent subscribes
Expand All @@ -134,11 +135,12 @@ protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTermi
*/
try {
// had to circumvent type check, we know what the array contains
onTerminate.call((Collection) Arrays.asList(newState.observers));
onTerminate.call(observerCollection);
} finally {
// mark that termination is completed
newState.terminationLatch.countDown();
}
return observerCollection;
}

/**
Expand Down
Loading