From ceff938a5b31dc992d2db0e8aa5e4b1735b0468e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 20 Mar 2014 13:59:28 +0100 Subject: [PATCH] Fixed deadlock in Subjects + OperatorCache. --- rxjava-core/src/main/java/rx/Observable.java | 4 +- ...OperationCache.java => OperatorCache.java} | 52 +++--- .../main/java/rx/subjects/AsyncSubject.java | 23 ++- .../java/rx/subjects/BehaviorSubject.java | 25 +-- .../main/java/rx/subjects/PublishSubject.java | 20 ++- .../main/java/rx/subjects/ReplaySubject.java | 26 +-- .../subjects/SubjectSubscriptionManager.java | 8 +- .../java/rx/operators/OperationCacheTest.java | 87 --------- .../java/rx/operators/OperatorCacheTest.java | 168 ++++++++++++++++++ 9 files changed, 262 insertions(+), 151 deletions(-) rename rxjava-core/src/main/java/rx/operators/{OperationCache.java => OperatorCache.java} (56%) delete mode 100644 rxjava-core/src/test/java/rx/operators/OperationCacheTest.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperatorCacheTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 235e1e62e4..bc20108ade 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -56,7 +56,6 @@ import rx.operators.OperationAsObservable; import rx.operators.OperationAverage; import rx.operators.OperationBuffer; -import rx.operators.OperationCache; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDebounce; @@ -105,6 +104,7 @@ import rx.operators.OperationUsing; import rx.operators.OperationWindow; import rx.operators.OperatorAmb; +import rx.operators.OperatorCache; import rx.operators.OperatorCast; import rx.operators.OperatorDoOnEach; import rx.operators.OperatorFilter; @@ -3863,7 +3863,7 @@ public final Observable> buffer(Observable boundary, int initialC * @see RxJava Wiki: cache() */ public final Observable cache() { - return create(OperationCache.cache(this)); + return create(new OperatorCache(this)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationCache.java b/rxjava-core/src/main/java/rx/operators/OperatorCache.java similarity index 56% rename from rxjava-core/src/main/java/rx/operators/OperationCache.java rename to rxjava-core/src/main/java/rx/operators/OperatorCache.java index eb10bc9ab6..6f3385f1de 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCache.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorCache.java @@ -18,10 +18,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Subscriber; import rx.Subscription; import rx.subjects.ReplaySubject; +import rx.subjects.Subject; /** * This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes @@ -35,30 +38,35 @@ * NOTE: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be * careful not to use this operator on Observables that emit infinite or very large numbers of * items, as this will use up memory. + * + * @param the cached value type */ -public class OperationCache { - - public static OnSubscribeFunc cache(final Observable source) { - return new OnSubscribeFunc() { - - final AtomicBoolean subscribed = new AtomicBoolean(false); - private final ReplaySubject cache = ReplaySubject.create(); +public final class OperatorCache implements OnSubscribe { + protected final Observable source; + protected final Subject cache; + protected final AtomicBoolean sourceSubscribed; - @Override - public Subscription onSubscribe(Observer observer) { - if (subscribed.compareAndSet(false, true)) { - // subscribe to the source once - source.subscribe(cache); - /* - * Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values. - * - * This means this should never be used on an infinite or very large sequence, similar to toList(). - */ - } - - return cache.subscribe(observer); - } + public OperatorCache(Observable source) { + this(source, ReplaySubject.create()); + } + + /** Test support. */ + public OperatorCache(Observable source, Subject cache) { + this.source = source; + this.cache = cache; + this.sourceSubscribed = new AtomicBoolean(); + } - }; + @Override + public void call(Subscriber t1) { + if (sourceSubscribed.compareAndSet(false, true)) { + source.subscribe(cache); + /* + * Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values. + * + * This means this should never be used on an infinite or very large sequence, similar to toList(). + */ + } + cache.subscribe(t1); } } diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 7b50cd9468..c2302d9890 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -105,35 +105,40 @@ protected AsyncSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager @Override public void onCompleted() { + Collection> observers = subscriptionManager.terminate(new Action1>>() { @Override public void call(Collection> observers) { - for (Observer o : observers) { - emitValueToObserver(lastNotification.get(), o); - } } }); + if (observers != null) { + for (Observer o : observers) { + emitValueToObserver(lastNotification.get(), o); + } + } } @Override public void onError(final Throwable e) { + Collection> observers = subscriptionManager.terminate(new Action1>>() { - @Override public void call(Collection> observers) { - lastNotification.set(new Notification(e)); - for (Observer o : observers) { - emitValueToObserver(lastNotification.get(), o); - } + lastNotification.set(Notification.createOnError(e)); } }); + if (observers != null) { + for (Observer o : observers) { + emitValueToObserver(lastNotification.get(), o); + } + } } @Override public void onNext(T v) { - lastNotification.set(new Notification(v)); + lastNotification.set(Notification.createOnNext(v)); } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index f59dc03133..5f2010fd85 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -144,31 +144,36 @@ protected BehaviorSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager @Override public void onCompleted() { + Collection> observers = subscriptionManager.terminate(new Action1>>() { @Override public void call(Collection> observers) { - lastNotification.set(new Notification()); - for (Observer o : observers) { - o.onCompleted(); - } + lastNotification.set(Notification.createOnCompleted()); } }); + if (observers != null) { + for (Observer o : observers) { + o.onCompleted(); + } + } } @Override public void onError(final Throwable e) { + Collection> observers = subscriptionManager.terminate(new Action1>>() { @Override public void call(Collection> observers) { - lastNotification.set(new Notification(e)); - for (Observer o : observers) { - o.onError(e); - } + lastNotification.set(Notification.createOnError(e)); } }); - + if (observers != null) { + for (Observer o : observers) { + o.onError(e); + } + } } @Override @@ -176,7 +181,7 @@ public void onNext(T v) { // do not overwrite a terminal notification // so new subscribers can get them if (lastNotification.get().isOnNext()) { - lastNotification.set(new Notification(v)); + lastNotification.set(Notification.createOnNext(v)); for (Observer o : subscriptionManager.rawSnapshot()) { o.onNext(v); } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 4d94a087fd..687d039e6a 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -95,31 +95,35 @@ protected PublishSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager< @Override public void onCompleted() { + Collection> observers = subscriptionManager.terminate(new Action1>>() { - @Override public void call(Collection> observers) { lastNotification.set(Notification. createOnCompleted()); - for (Observer o : observers) { - o.onCompleted(); - } } }); + if (observers != null) { + for (Observer o : observers) { + o.onCompleted(); + } + } } @Override public void onError(final Throwable e) { + Collection> observers = subscriptionManager.terminate(new Action1>>() { @Override public void call(Collection> observers) { lastNotification.set(Notification.createOnError(e)); - for (Observer o : observers) { - o.onError(e); - } } }); - + if (observers != null) { + for (Observer o : observers) { + o.onError(e); + } + } } @Override diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 7d659822c2..01107efa20 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -122,34 +122,40 @@ protected ReplaySubject(OnSubscribe onSubscribe, SubjectSubscriptionManager> observers = subscriptionManager.terminate(new Action1>>() { @Override public void call(Collection> observers) { state.history.complete(Notification.createOnCompleted()); - for (SubjectObserver o : observers) { - if (caughtUp(o)) { - o.onCompleted(); - } - } } }); + if (observers != null) { + for (SubjectObserver o : observers) { + if (caughtUp(o)) { + o.onCompleted(); + } + } + } } @Override public void onError(final Throwable e) { + Collection> observers = subscriptionManager.terminate(new Action1>>() { @Override public void call(Collection> observers) { state.history.complete(Notification.createOnError(e)); - for (SubjectObserver o : observers) { - if (caughtUp(o)) { - o.onError(e); - } - } } }); + if (observers != null) { + for (SubjectObserver o : observers) { + if (caughtUp(o)) { + o.onError(e); + } + } + } } @Override diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 6f049eb38a..8022f9723d 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -113,19 +113,20 @@ public void call() { } @SuppressWarnings({ "unchecked", "rawtypes" }) - protected void terminate(Action1>> onTerminate) { + protected Collection> terminate(Action1>> onTerminate) { State current; State newState = null; do { current = state.get(); if (current.terminated) { // already terminated so do nothing - return; + return null; } else { newState = current.terminate(); } } while (!state.compareAndSet(current, newState)); + Collection> observerCollection = (Collection)Arrays.asList(newState.observers); /* * if we get here then we won setting the state to terminated * and have a deterministic set of Observers to emit to (concurrent subscribes @@ -134,11 +135,12 @@ protected void terminate(Action1>> onTermi */ try { // had to circumvent type check, we know what the array contains - onTerminate.call((Collection) Arrays.asList(newState.observers)); + onTerminate.call(observerCollection); } finally { // mark that termination is completed newState.terminationLatch.countDown(); } + return observerCollection; } /** diff --git a/rxjava-core/src/test/java/rx/operators/OperationCacheTest.java b/rxjava-core/src/test/java/rx/operators/OperationCacheTest.java deleted file mode 100644 index 8637eb8521..0000000000 --- a/rxjava-core/src/test/java/rx/operators/OperationCacheTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static org.junit.Assert.*; -import static rx.operators.OperationCache.*; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.functions.Action1; -import rx.subscriptions.BooleanSubscription; - -public class OperationCacheTest { - - @Test - public void testCache() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - Observable o = Observable.create(cache(Observable.create(new Observable.OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer observer) { - final BooleanSubscription subscription = new BooleanSubscription(); - new Thread(new Runnable() { - - @Override - public void run() { - counter.incrementAndGet(); - System.out.println("published observable being executed"); - observer.onNext("one"); - observer.onCompleted(); - } - }).start(); - return subscription; - } - }))); - - // we then expect the following 2 subscriptions to get that same value - final CountDownLatch latch = new CountDownLatch(2); - - // subscribe once - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - System.out.println("v: " + v); - latch.countDown(); - } - }); - - // subscribe again - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - System.out.println("v: " + v); - latch.countDown(); - } - }); - - if (!latch.await(1000, TimeUnit.MILLISECONDS)) { - fail("subscriptions did not receive values"); - } - assertEquals(1, counter.get()); - } -} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorCacheTest.java b/rxjava-core/src/test/java/rx/operators/OperatorCacheTest.java new file mode 100644 index 0000000000..3a6c170c56 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorCacheTest.java @@ -0,0 +1,168 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Arrays; +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.observers.TestObserver; +import rx.schedulers.Schedulers; +import rx.subjects.AsyncSubject; +import rx.subjects.BehaviorSubject; +import rx.subjects.PublishSubject; +import rx.subjects.ReplaySubject; +import rx.subjects.Subject; +import rx.subscriptions.BooleanSubscription; + +public class OperatorCacheTest { + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Observable.OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).cache(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + void testWithCustomSubjectAndRepeat(Subject subject, Integer... expected) { + Observable source0 = Observable.from(1, 2, 3) + .subscribeOn(Schedulers.io()) + .flatMap(new Func1>() { + @Override + public Observable call(final Integer i) { + return Observable.timer(i * 20, TimeUnit.MILLISECONDS).map(new Func1() { + @Override + public Integer call(Long t1) { + return i; + } + }); + } + }); + + Observable source1 = Observable.create(new OperatorCache(source0, subject)); + + Observable source2 = source1 + .repeat(4) + .zip(Observable.timer(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2() { + @Override + public Integer call(Integer t1, Long t2) { + return t1; + } + + }); + final CountDownLatch cdl = new CountDownLatch(1); + TestObserver test = new TestObserver(new Observer() { + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + cdl.countDown(); + } + + @Override + public void onCompleted() { + cdl.countDown(); + } + }); + source2.subscribe(test); + + try { + cdl.await(20, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + fail("Interrupted"); + } + + test.assertReceivedOnNext(Arrays.asList(expected)); + test.assertTerminalEvent(); + assertTrue(test.getOnErrorEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testWithAsyncSubjectAndRepeat() { + testWithCustomSubjectAndRepeat(AsyncSubject.create(), 3, 3, 3, 3); + } + @Test(timeout = 10000) + public void testWithBehaviorSubjectAndRepeat() { + // BehaviorSubject just completes when repeated + testWithCustomSubjectAndRepeat(BehaviorSubject.create(0), 0, 1, 2, 3); + } + @Test(timeout = 10000) + public void testWithPublishSubjectAndRepeat() { + // PublishSubject just completes when repeated + testWithCustomSubjectAndRepeat(PublishSubject.create(), 1, 2, 3); + } + @Test(timeout = 10000) + public void testWithReplaySubjectAndRepeat() { + testWithCustomSubjectAndRepeat(ReplaySubject.create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); + } +}