diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index f95ab6e216..a1a65d7c5b 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -15,13 +15,9 @@ */ package rx.subjects; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; - -import rx.Notification; import rx.Observer; -import rx.functions.Action0; import rx.functions.Action1; +import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; /** @@ -56,88 +52,59 @@ public final class AsyncSubject extends Subject { public static AsyncSubject create() { - final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); - final AtomicReference> lastNotification = new AtomicReference>(Notification.createOnCompleted()); - - OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc( - /** - * This function executes at beginning of subscription. - * - * This will always run, even if Subject is in terminal state. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - // nothing to do if not terminated - } - }, - /** - * This function executes if the Subject is terminated. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - // we want the last value + completed so add this extra logic - // to send onCompleted if the last value is an onNext - emitValueToObserver(lastNotification.get(), o); - } - }, null); - - return new AsyncSubject(onSubscribe, subscriptionManager, lastNotification); + final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); + state.onTerminated = new Action1>() { + @Override + public void call(SubjectObserver o) { + Object v = state.get(); + o.accept(v); + o.completeSingle(v); + } + }; + return new AsyncSubject(state, state); } - protected static void emitValueToObserver(Notification n, Observer o) { - n.accept(o); - if (n.isOnNext()) { - o.onCompleted(); - } - } + final SubjectSubscriptionManager state; + volatile Object lastValue; + private final NotificationLite nl = NotificationLite.instance(); - private final SubjectSubscriptionManager subscriptionManager; - final AtomicReference> lastNotification; - protected AsyncSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification) { + protected AsyncSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager state) { super(onSubscribe); - this.subscriptionManager = subscriptionManager; - this.lastNotification = lastNotification; + this.state = state; } @Override public void onCompleted() { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { + if (state.active) { + Object last = lastValue; + if (last == null) { + last = nl.completed(); } - }); - if (observers != null) { - for (Observer o : observers) { - emitValueToObserver(lastNotification.get(), o); + for (SubjectObserver bo : state.terminate(last)) { + if (last == nl.completed()) { + bo.onCompleted(); + } else { + bo.onNext(nl.getValue(last)); + bo.onCompleted(); + } } } } @Override public void onError(final Throwable e) { - Collection> observers = subscriptionManager.terminate(new Action0() { - @Override - public void call() { - lastNotification.set(Notification. createOnError(e)); - } - }); - if (observers != null) { - for (Observer o : observers) { - emitValueToObserver(lastNotification.get(), o); + if (state.active) { + Object n = nl.error(e); + for (SubjectObserver bo : state.terminate(n)) { + bo.onError(e); } } - } @Override public void onNext(T v) { - lastNotification.set(Notification.createOnNext(v)); + lastValue = nl.next(v); } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index c6c3214afa..78efdc0edf 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -15,14 +15,14 @@ */ package rx.subjects; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; -import rx.Notification; import rx.Observer; +import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; +import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; +import rx.subscriptions.Subscriptions; /** * Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}. @@ -65,110 +65,86 @@ * * @param */ +@SuppressWarnings({ "unchecked", "rawtypes" }) public final class BehaviorSubject extends Subject { - + /** + * Create a {@link BehaviorSubject} without a default value. + * @param the value type + * @return the constructed {@link BehaviorSubject} + */ + public static BehaviorSubject create() { + return create(null, false); + } /** * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it. * + * @param the value type * @param defaultValue * the value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events * @return the constructed {@link BehaviorSubject} */ public static BehaviorSubject create(T defaultValue) { - final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); - // set a default value so subscriptions will immediately receive this until a new notification is received - final AtomicReference> lastNotification = new AtomicReference>(Notification.createOnNext(defaultValue)); - - OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc( - /** - * This function executes at beginning of subscription. - * - * This will always run, even if Subject is in terminal state. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - /* - * When we subscribe we always emit the latest value to the observer. - * - * Here we only emit if it's an onNext as terminal states are handled in the next function. - */ - Notification n = lastNotification.get(); - if (n.isOnNext()) { - n.accept(o); - } - } - }, - /** - * This function executes if the Subject is terminated before subscription occurs. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - /* - * If we are already terminated, or termination happens while trying to subscribe - * this will be invoked and we emit whatever the last terminal value was. - */ - lastNotification.get().accept(o); - } - }, null); + return create(defaultValue, true); + } + private static BehaviorSubject create(T defaultValue, boolean hasDefault) { + final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); + if (hasDefault) { + state.set(NotificationLite.instance().next(defaultValue)); + } + state.onAdded = new Action1>() { - return new BehaviorSubject(onSubscribe, subscriptionManager, lastNotification); + @Override + public void call(SubjectObserver o) { + o.emitFirst(state.get()); + } + + }; + state.onTerminated = state.onAdded; + return new BehaviorSubject(state, state); } - private final SubjectSubscriptionManager subscriptionManager; - final AtomicReference> lastNotification; + private final SubjectSubscriptionManager state; + private final NotificationLite nl = NotificationLite.instance(); - protected BehaviorSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification) { + protected BehaviorSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager state) { super(onSubscribe); - this.subscriptionManager = subscriptionManager; - this.lastNotification = lastNotification; + this.state = state; } @Override public void onCompleted() { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { - lastNotification.set(Notification. createOnCompleted()); - } - }); - if (observers != null) { - for (Observer o : observers) { - o.onCompleted(); + Object last = state.get(); + if (last == null || state.active) { + Object n = nl.completed(); + for (SubjectObserver bo : state.terminate(n)) { + bo.emitNext(n); } } } @Override - public void onError(final Throwable e) { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { - lastNotification.set(Notification. createOnError(e)); - } - }); - if (observers != null) { - for (Observer o : observers) { - o.onError(e); + public void onError(Throwable e) { + Object last = state.get(); + if (last == null || state.active) { + Object n = nl.error(e); + for (SubjectObserver bo : state.terminate(n)) { + bo.emitNext(n); } } } @Override public void onNext(T v) { - // do not overwrite a terminal notification - // so new subscribers can get them - if (lastNotification.get().isOnNext()) { - lastNotification.set(Notification.createOnNext(v)); - for (Observer o : subscriptionManager.rawSnapshot()) { - o.onNext(v); + Object last = state.get(); + if (last == null || state.active) { + Object n = nl.next(v); + for (SubjectObserver bo : state.next(n)) { + bo.emitNext(n); } } } - + + /* test support */ int subscriberCount() { + return state.observers().length; + } } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 998385a741..82e76172f6 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -15,13 +15,9 @@ */ package rx.subjects; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; - -import rx.Notification; import rx.Observer; -import rx.functions.Action0; import rx.functions.Action1; +import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; /** @@ -50,85 +46,52 @@ public final class PublishSubject extends Subject { public static PublishSubject create() { - final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); - // set a default value so subscriptions will immediately receive this until a new notification is received - final AtomicReference> lastNotification = new AtomicReference>(); - - OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc( - /** - * This function executes at beginning of subscription. - * - * This will always run, even if Subject is in terminal state. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - // nothing onSubscribe unless in terminal state which is the next function - } - }, - /** - * This function executes if the Subject is terminated before subscription occurs. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - /* - * If we are already terminated, or termination happens while trying to subscribe - * this will be invoked and we emit whatever the last terminal value was. - */ - lastNotification.get().accept(o); - } - }, null); + final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); + state.onAdded = new Action1>() { - return new PublishSubject(onSubscribe, subscriptionManager, lastNotification); + @Override + public void call(SubjectObserver o) { + o.emitFirst(state.get()); + } + + }; + state.onTerminated = state.onAdded; + return new PublishSubject(state, state); } - private final SubjectSubscriptionManager subscriptionManager; - final AtomicReference> lastNotification; - - protected PublishSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification) { + final SubjectSubscriptionManager state; + private final NotificationLite nl = NotificationLite.instance(); + + protected PublishSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager state) { super(onSubscribe); - this.subscriptionManager = subscriptionManager; - this.lastNotification = lastNotification; + this.state = state; } @Override public void onCompleted() { - Collection> observers = subscriptionManager.terminate(new Action0() { - @Override - public void call() { - lastNotification.set(Notification. createOnCompleted()); - } - }); - if (observers != null) { - for (Observer o : observers) { - o.onCompleted(); + if (state.active) { + Object n = nl.completed(); + for (SubjectObserver bo : state.terminate(n)) { + bo.emitNext(n); } } + } @Override public void onError(final Throwable e) { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { - lastNotification.set(Notification. createOnError(e)); - } - }); - if (observers != null) { - for (Observer o : observers) { - o.onError(e); + if (state.active) { + Object n = nl.error(e); + for (SubjectObserver bo : state.terminate(n)) { + bo.emitNext(n); } } } @Override public void onNext(T v) { - for (Observer o : subscriptionManager.rawSnapshot()) { - o.onNext(v); + for (SubjectObserver bo : state.observers()) { + bo.onNext(v); } } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 3277219351..8bfb77be5d 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -16,12 +16,10 @@ package rx.subjects; import java.util.ArrayList; -import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import rx.Observer; -import rx.functions.Action0; import rx.functions.Action1; import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -57,46 +55,33 @@ public static ReplaySubject create() { public static ReplaySubject create(int initialCapacity) { final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); final ReplayState state = new ReplayState(initialCapacity); + subscriptionManager.onStart = new Action1>() { + @Override + public void call(SubjectObserver o) { + // replay history for this observer using the subscribing thread + int lastIndex = replayObserverFromIndex(state.history, 0, o); - OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc( - /** - * This function executes at beginning of subscription. - * We want to replay history with the subscribing thread - * before the Observer gets registered. - * - * This will always run, even if Subject is in terminal state. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - // replay history for this observer using the subscribing thread - int lastIndex = replayObserverFromIndex(state.history, 0, o); - - // now that it is caught up add to observers - state.replayState.put(o, lastIndex); - } - }, - /** - * This function executes if the Subject is terminated. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - Integer idx = state.replayState.remove(o); - // we will finish replaying if there is anything left - replayObserverFromIndex(state.history, idx, o); - } - }, - new Action1>() { - @Override - public void call(SubjectObserver o) { - state.replayState.remove(o); - } - }); - - return new ReplaySubject(onSubscribe, subscriptionManager, state); + // now that it is caught up add to observers + state.replayState.put(o, lastIndex); + } + }; + subscriptionManager.onTerminated = new Action1>() { + @Override + public void call(SubjectObserver o) { + Integer idx = state.replayState.remove(o); + if (idx == null) { + idx = 0; + } + replayObserverFromIndex(state.history, idx, o); + } + }; + subscriptionManager.onUnsubscribed = new Action1>() { + @Override + public void call(SubjectObserver o) { + state.replayState.remove(o); + } + }; + return new ReplaySubject(subscriptionManager, subscriptionManager, state); } private static class ReplayState { @@ -122,15 +107,10 @@ protected ReplaySubject(OnSubscribe onSubscribe, SubjectSubscriptionManager> observers = subscriptionManager.terminate(new Action0() { - @Override - public void call() { - state.history.complete(); - } - }); - if (observers != null) { - for (SubjectObserver o : observers) { + if (subscriptionManager.active) { + state.history.complete(); + for (SubjectObserver o : subscriptionManager.terminate(NotificationLite.instance().completed())) { if (caughtUp(o)) { o.onCompleted(); } @@ -140,15 +120,9 @@ public void call() { @Override public void onError(final Throwable e) { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { - state.history.complete(e); - } - }); - if (observers != null) { - for (SubjectObserver o : observers) { + if (subscriptionManager.active) { + state.history.complete(e); + for (SubjectObserver o : subscriptionManager.terminate(NotificationLite.instance().completed())) { if (caughtUp(o)) { o.onError(e); } @@ -162,7 +136,7 @@ public void onNext(T v) { return; } state.history.next(v); - for (SubjectObserver o : subscriptionManager.rawSnapshot()) { + for (SubjectObserver o : subscriptionManager.observers()) { if (caughtUp(o)) { o.onNext(v); } @@ -189,21 +163,20 @@ private boolean caughtUp(SubjectObserver o) { private void replayObserver(SubjectObserver observer) { Integer lastEmittedLink = state.replayState.get(observer); - if (lastEmittedLink != null) { - int l = replayObserverFromIndex(state.history, lastEmittedLink, observer); - state.replayState.put(observer, l); - } else { - throw new IllegalStateException("failed to find lastEmittedLink for: " + observer); + if (lastEmittedLink == null) { + lastEmittedLink = 0; } + int l = replayObserverFromIndex(state.history, lastEmittedLink, observer); + state.replayState.put(observer, l); } - private static int replayObserverFromIndex(History history, Integer l, SubjectObserver observer) { - while (l < history.index.get()) { - history.accept(observer, l); - l++; + static int replayObserverFromIndex(History history, int idx, SubjectObserver observer) { + while (idx < history.index.get()) { + history.accept(observer, idx); + idx++; } - return l; + return idx; } /** diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 7efbafb64b..d3af5a17e7 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -1,266 +1,332 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.subjects; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; - import rx.Observable.OnSubscribe; + import rx.Observer; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; +import rx.functions.Actions; +import rx.operators.NotificationLite; import rx.subscriptions.Subscriptions; -/* package */class SubjectSubscriptionManager { - - private AtomicReference> state = new AtomicReference>(new State()); - +/** + * Represents the typical state and OnSubscribe logic for a Subject implementation. + * @param the source and return value type + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +/* package */final class SubjectSubscriptionManager implements OnSubscribe { + /** Contains the unsubscription flag and the array of active subscribers. */ + private final AtomicReference> state = new AtomicReference>(State.EMPTY); + /** Stores the latest value or the terminal value for some Subjects. */ + final AtomicReference latest = new AtomicReference(); + /** Indicates that the subject is active (cheaper than checking the state).*/ + boolean active = true; + /** Action called when a new subscriber subscribes but before it is added to the state. */ + Action1> onStart = Actions.empty(); + /** Action called after the subscriber has been added to the state. */ + Action1> onAdded = Actions.empty(); + /** Action called when the subscriber wants to subscribe to a terminal state. */ + Action1> onTerminated = Actions.empty(); + /** Called when the subscruber explicitly unsubscribes. */ + Action1> onUnsubscribed = Actions.empty(); + @Override + public void call(final Subscriber child) { + SubjectObserver bo = new SubjectObserver(child); + addUnsubscriber(child, bo); + onStart.call(bo); + if (add(bo) && child.isUnsubscribed()) { + remove(bo); + onUnsubscribed.call(bo); + } + } + /** Registers the unsubscribe action for the given subscriber. */ + void addUnsubscriber(Subscriber child, final SubjectObserver bo) { + child.add(Subscriptions.create(new Action0() { + @Override + public void call() { + remove(bo); + onUnsubscribed.call(bo); + } + })); + } + /** Set the latest NotificationLite value. */ + void set(Object value) { + this.latest.set(value); + } + /** @return Retrieve the latest NotificationLite value */ + Object get() { + return latest.get(); + } + /** @return the array of active subscribers, don't write into the array! */ + SubjectObserver[] observers() { + return state.get().observers; + } /** - * - * @param onSubscribe - * Always runs at the beginning of 'subscribe' regardless of terminal state. - * @param onTerminated - * Only runs if Subject is in terminal state and the Observer ends up not being registered. - * @param onUnsubscribe - * called after the child subscription is removed from the state - * @return + * Try to atomically add a SubjectObserver to the active state. + * @param o the SubjectObserver to add + * @return false if the subject is already in its terminal state */ - public OnSubscribe getOnSubscribeFunc(final Action1> onSubscribe, - final Action1> onTerminated, - final Action1> onUnsubscribe) { - return new OnSubscribe() { - @Override - public void call(Subscriber actualOperator) { - final SubjectObserver observer = new SubjectObserver(actualOperator); - // invoke onSubscribe logic - if (onSubscribe != null) { - onSubscribe.call(observer); - } - - State current; - State newState = null; - boolean addedObserver = false; - do { - current = state.get(); - if (current.terminated) { - // we are terminated so don't need to do anything - addedObserver = false; - // break out and don't try to modify state - newState = current; - // wait for termination to complete if - try { - current.terminationLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted waiting for termination.", e); - } - break; - } else { - addedObserver = true; - // add to parent if the Subject itself is unsubscribed - actualOperator.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - State current; - State newState; - do { - current = state.get(); - // on unsubscribe remove it from the map of outbound observers to notify - newState = current.removeObserver(observer); - } while (!state.compareAndSet(current, newState)); - if (onUnsubscribe != null) { - onUnsubscribe.call(observer); - } - } - })); - if (actualOperator.isUnsubscribed()) { - // we've been unsubscribed while working so return and do nothing - return; - } - // on subscribe add it to the map of outbound observers to notify - newState = current.addObserver(observer); - } - } while (!state.compareAndSet(current, newState)); - - /** - * Whatever happened above, if we are terminated we run `onTerminated` - */ - if (newState.terminated && !addedObserver) { - onTerminated.call(observer); - } + boolean add(SubjectObserver o) { + do { + State oldState = state.get(); + if (oldState.terminated) { + onTerminated.call(o); + return false; } - - }; + State newState = oldState.add(o); + if (state.compareAndSet(oldState, newState)) { + onAdded.call(o); + return true; + } + } while (true); } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - protected Collection> terminate(Action0 onTerminate) { - State current; - State newState = null; + /** + * Atomically remove the specified SubjectObserver from the active observers. + * @param o the SubjectObserver to remove + */ + void remove(SubjectObserver o) { do { - current = state.get(); - if (current.terminated) { - // already terminated so do nothing - return null; - } else { - newState = current.terminate(); + State oldState = state.get(); + if (oldState.terminated) { + return; } - } while (!state.compareAndSet(current, newState)); - - Collection> observerCollection = (Collection) Arrays.asList(newState.observers); - /* - * if we get here then we won setting the state to terminated - * and have a deterministic set of Observers to emit to (concurrent subscribes - * will have failed and will try again and see we are terminated) - */ - try { - // had to circumvent type check, we know what the array contains - onTerminate.call(); - } finally { - // mark that termination is completed - newState.terminationLatch.countDown(); - } - return observerCollection; + State newState = oldState.remove(o); + if (newState == oldState || state.compareAndSet(oldState, newState)) { + return; + } + } while (true); } - /** - * Returns the array of observers directly. - * Don't modify the array! - * - * @return the array of current observers + * Set a new latest NotificationLite value and return the active observers. + * @param n the new latest value + * @return the array of SubjectObservers, don't write into the array! */ - @SuppressWarnings("unchecked") - public SubjectObserver[] rawSnapshot() { + SubjectObserver[] next(Object n) { + set(n); return state.get().observers; } + /** + * Atomically set the terminal NotificationLite value (which could be any of the 3), + * clear the active observers and return the last active observers. + * @param n the terminal value + * @return the last active SubjectObservers + */ + SubjectObserver[] terminate(Object n) { + set(n); + active = false; + do { + State oldState = state.get(); + if (oldState.terminated) { + return State.NO_OBSERVERS; + } + if (state.compareAndSet(oldState, State.TERMINATED)) { + return oldState.observers; + } + } while (true); + } - @SuppressWarnings("rawtypes") - protected static class State { + /** State-machine representing the termination state and active SubjectObservers. */ + protected static final class State { final boolean terminated; - final CountDownLatch terminationLatch; final SubjectObserver[] observers; - // to avoid lots of empty arrays - final SubjectObserver[] EMPTY_O = new SubjectObserver[0]; - - private State(boolean isTerminated, CountDownLatch terminationLatch, SubjectObserver[] observers) { - this.terminationLatch = terminationLatch; - this.terminated = isTerminated; + static final SubjectObserver[] NO_OBSERVERS = new SubjectObserver[0]; + static final State TERMINATED = new State(true, NO_OBSERVERS); + static final State EMPTY = new State(false, NO_OBSERVERS); + + public State(boolean terminated, SubjectObserver[] observers) { + this.terminated = terminated; this.observers = observers; } - - State() { - this.terminated = false; - this.terminationLatch = null; - this.observers = EMPTY_O; - } - - public State terminate() { - if (terminated) { - throw new IllegalStateException("Already terminated."); - } - return new State(true, new CountDownLatch(1), observers); - } - - public State addObserver(SubjectObserver observer) { - int n = this.observers.length; - - SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n + 1); - - newobservers[n] = observer; - - return createNewWith(newobservers); - } - - private State createNewWith(SubjectObserver[] newobservers) { - return new State(terminated, terminationLatch, newobservers); + public State add(SubjectObserver o) { + SubjectObserver[] a = observers; + int n = a.length; + SubjectObserver[] b = new SubjectObserver[n + 1]; + System.arraycopy(observers, 0, b, 0, n); + b[n] = o; + return new State(terminated, b); } - - public State removeObserver(SubjectObserver o) { - // we are empty, nothing to remove - if (this.observers.length == 0) { - return this; - } - - int n = this.observers.length - 1; - int copied = 0; - SubjectObserver[] newobservers = new SubjectObserver[n]; - - for (int i = 0; i < this.observers.length; i++) { - SubjectObserver s0 = this.observers[i]; - if (!s0.equals(o)) { - if (copied == n) { - // if s was not found till the end of the iteration - // we return ourselves since no modification should - // have happened + public State remove(SubjectObserver o) { + SubjectObserver[] a = observers; + int n = a.length; + if (n == 1 && a[0] == o) { + return EMPTY; + } else + if (n == 0) { + return this; + } + SubjectObserver[] b = new SubjectObserver[n - 1]; + int j = 0; + for (int i = 0; i < n; i++) { + SubjectObserver ai = a[i]; + if (ai != o) { + if (j == n - 1) { return this; } - newobservers[copied] = s0; - copied++; + b[j++] = ai; } } - - if (copied == 0) { - return createNewWith(EMPTY_O); + if (j == 0) { + return EMPTY; } - // if somehow copied less than expected, truncate the arrays - // if s is unique, this should never happen - if (copied < n) { - SubjectObserver[] newobservers2 = new SubjectObserver[copied]; - System.arraycopy(newobservers, 0, newobservers2, 0, copied); - - return createNewWith(newobservers2); + if (j < n - 1) { + SubjectObserver[] c = new SubjectObserver[j]; + System.arraycopy(b, 0, c, 0, j); + b = c; } - return createNewWith(newobservers); + return new State(terminated, b); } } - - protected static class SubjectObserver implements Observer { - - private final Observer actual; - protected volatile boolean caughtUp = false; - boolean once = true; - SubjectObserver(Observer actual) { + + /** + * Observer wrapping the actual Subscriber and providing various + * emission facilities. + * @param the consumed value type of the actual Observer + */ + protected static final class SubjectObserver implements Observer { + /** The actual Observer. */ + final Observer actual; + /** The NotificationLite to avoid allocating objects for each OnNext value. */ + final NotificationLite nl = NotificationLite.instance(); + /** Was the emitFirst run? Guarded by this. */ + boolean first = true; + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + List queue; + /* volatile */boolean fastPath; + protected volatile boolean caughtUp; + public SubjectObserver(Observer actual) { this.actual = actual; } - @Override - public void onCompleted() { - if (once) { - once = false; - this.actual.onCompleted(); - } + public void onNext(T t) { + actual.onNext(t); } - @Override public void onError(Throwable e) { - if (once) { - once = false; - this.actual.onError(e); - } + actual.onError(e); } - @Override - public void onNext(T v) { - this.actual.onNext(v); + public void onCompleted() { + actual.onCompleted(); + } + /** + * Emits the given NotificationLite value and + * prevents the emitFirst to run if not already run. + * @param n the NotificationLite value + */ + protected void emitNext(Object n) { + if (!fastPath) { + synchronized (this) { + first = false; + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(n); + return; + } + } + fastPath = true; + } + nl.accept(actual, n); + } + /** + * Tries to emit a NotificationLite value as the first + * value and drains the queue as long as possible. + * @param n the NotificationLite value + */ + protected void emitFirst(Object n) { + synchronized (this) { + if (!first || emitting) { + return; + } + first = false; + emitting = n != null; + } + if (n != null) { + emitLoop(null, n); + } + } + /** + * Emits the contents of the queue as long as there are values. + * @param localQueue the initial queue contents + * @param current the current content to emit + */ + protected void emitLoop(List localQueue, Object current) { + boolean once = true; + boolean skipFinal = false; + try { + do { + if (localQueue != null) { + for (Object n : localQueue) { + accept(n); + } + } + if (once) { + once = false; + accept(current); + } + synchronized (this) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + break; + } + } + } while (true); + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + /** + * Dispatches a NotificationLite value to the actual Observer. + * @param n the value to dispatch + */ + protected void accept(Object n) { + if (n != null) { + nl.accept(actual, n); + } + } + + /** + * Emit an OnCompleted event if the value object is + * an OnNext NotificationLite object or null. + * @param value + */ + protected void completeSingle(Object value) { + if (value == null || (!nl.isCompleted(value) && !nl.isError(value))) { + actual.onCompleted(); + } + } + /** @return the actual Observer. */ + protected Observer getActual() { + return actual; } - } - } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/TestSubject.java b/rxjava-core/src/main/java/rx/subjects/TestSubject.java index 7b9fb69e32..8113705c04 100644 --- a/rxjava-core/src/main/java/rx/subjects/TestSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/TestSubject.java @@ -15,15 +15,13 @@ */ package rx.subjects; -import java.util.Collection; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import rx.Notification; import rx.Observer; import rx.Scheduler; import rx.functions.Action0; import rx.functions.Action1; +import rx.operators.NotificationLite; import rx.schedulers.TestScheduler; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; @@ -53,49 +51,27 @@ public final class TestSubject extends Subject { public static TestSubject create(TestScheduler scheduler) { - final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); - // set a default value so subscriptions will immediately receive this until a new notification is received - final AtomicReference> lastNotification = new AtomicReference>(); - - OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc( - /** - * This function executes at beginning of subscription. - * - * This will always run, even if Subject is in terminal state. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - // nothing onSubscribe unless in terminal state which is the next function - } - }, - /** - * This function executes if the Subject is terminated before subscription occurs. - */ - new Action1>() { - - @Override - public void call(SubjectObserver o) { - /* - * If we are already terminated, or termination happens while trying to subscribe - * this will be invoked and we emit whatever the last terminal value was. - */ - lastNotification.get().accept(o); - } - }, null); - - return new TestSubject(onSubscribe, subscriptionManager, lastNotification, scheduler); + final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); + + state.onAdded = new Action1>() { + + @Override + public void call(SubjectObserver o) { + o.emitFirst(state.get()); + } + + }; + state.onTerminated = state.onAdded; + + return new TestSubject(state, state, scheduler); } - private final SubjectSubscriptionManager subscriptionManager; - private final AtomicReference> lastNotification; + private final SubjectSubscriptionManager state; private final Scheduler.Worker innerScheduler; - protected TestSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification, TestScheduler scheduler) { + protected TestSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager state, TestScheduler scheduler) { super(onSubscribe); - this.subscriptionManager = subscriptionManager; - this.lastNotification = lastNotification; + this.state = state; this.innerScheduler = scheduler.createWorker(); } @@ -105,16 +81,9 @@ public void onCompleted() { } private void _onCompleted() { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { - lastNotification.set(Notification. createOnCompleted()); - } - }); - if (observers != null) { - for (Observer o : observers) { - o.onCompleted(); + if (state.active) { + for (SubjectObserver bo : state.terminate(NotificationLite.instance().completed())) { + bo.onCompleted(); } } } @@ -136,19 +105,11 @@ public void onError(final Throwable e) { } private void _onError(final Throwable e) { - Collection> observers = subscriptionManager.terminate(new Action0() { - - @Override - public void call() { - lastNotification.set(Notification. createOnError(e)); - } - }); - if (observers != null) { - for (Observer o : observers) { - o.onError(e); + if (state.active) { + for (SubjectObserver bo : state.terminate(NotificationLite.instance().error(e))) { + bo.onError(e); } } - } public void onError(final Throwable e, long timeInMilliseconds) { @@ -168,7 +129,7 @@ public void onNext(T v) { } private void _onNext(T v) { - for (Observer o : subscriptionManager.rawSnapshot()) { + for (Observer o : state.observers()) { o.onNext(v); } } diff --git a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java index 739fbe2966..58197c5f97 100644 --- a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java @@ -15,6 +15,7 @@ */ package rx.subjects; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -285,4 +286,85 @@ public void onCompleted() { verify(o, never()).onError(any(Throwable.class)); } } + @Test + public void testStartEmpty() { + BehaviorSubject source = BehaviorSubject.create(); + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.subscribe(o); + + inOrder.verify(o, never()).onNext(any()); + inOrder.verify(o, never()).onCompleted(); + + source.onNext(1); + + source.onCompleted(); + + source.onNext(2); + + verify(o, never()).onError(any(Throwable.class)); + + inOrder.verify(o).onNext(1); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + + } + @Test + public void testStartEmptyThenAddOne() { + BehaviorSubject source = BehaviorSubject.create(); + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.onNext(1); + + source.subscribe(o); + + inOrder.verify(o).onNext(1); + + source.onCompleted(); + + source.onNext(2); + + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onError(any(Throwable.class)); + + } + @Test + public void testStartEmptyCompleteWithOne() { + BehaviorSubject source = BehaviorSubject.create(); + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + source.onNext(1); + source.onCompleted(); + + source.onNext(2); + + source.subscribe(o); + + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onNext(any()); + } + + @Test + public void testTakeOneSubscriber() { + BehaviorSubject source = BehaviorSubject.create(1); + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + source.take(1).subscribe(o); + + verify(o).onNext(1); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + assertEquals(0, source.subscriberCount()); + } }