diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3589745629..9e27a7d846 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4821,7 +4821,7 @@ public final Observable replay(Func1, ? extends Obs return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperatorReplay.replayBuffered(bufferSize); + return ReplaySubject.createWithSize(bufferSize); } }, selector)); } @@ -4889,7 +4889,7 @@ public final Observable replay(Func1, ? extends Obs return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperatorReplay.replayWindowed(time, unit, bufferSize, scheduler); + return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); } }, selector)); } @@ -4920,7 +4920,7 @@ public final Observable replay(Func1, ? extends Obs return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperatorReplay. createScheduledSubject(OperatorReplay. replayBuffered(bufferSize), scheduler); + return OperatorReplay. createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); } }, selector)); } @@ -4979,7 +4979,7 @@ public final Observable replay(Func1, ? extends Obs return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperatorReplay.replayWindowed(time, unit, -1, scheduler); + return ReplaySubject.createWithTime(time, unit, scheduler); } }, selector)); } @@ -5028,7 +5028,7 @@ public final Subject call() { * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize) { - return new OperatorMulticast(this, OperatorReplay. replayBuffered(bufferSize)); + return new OperatorMulticast(this, ReplaySubject.createWithSize(bufferSize)); } /** @@ -5081,7 +5081,7 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return new OperatorMulticast(this, OperatorReplay. replayWindowed(time, unit, bufferSize, scheduler)); + return new OperatorMulticast(this, ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler)); } /** @@ -5104,7 +5104,7 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit public final ConnectableObservable replay(int bufferSize, Scheduler scheduler) { return new OperatorMulticast(this, OperatorReplay.createScheduledSubject( - OperatorReplay. replayBuffered(bufferSize), scheduler)); + ReplaySubject.createWithSize(bufferSize), scheduler)); } /** @@ -5148,7 +5148,7 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { - return new OperatorMulticast(this, OperatorReplay. replayWindowed(time, unit, -1, scheduler)); + return new OperatorMulticast(this, ReplaySubject.createWithTime(time, unit, scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperatorReplay.java b/rxjava-core/src/main/java/rx/operators/OperatorReplay.java index 67dd9e5496..0452ca3712 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorReplay.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorReplay.java @@ -15,25 +15,11 @@ */ package rx.operators; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Scheduler; import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.functions.Func1; -import rx.functions.Functions; -import rx.schedulers.Timestamped; import rx.subjects.Subject; /** @@ -48,17 +34,6 @@ private OperatorReplay() { throw new IllegalStateException("No instances!"); } - /** - * Create a CustomReplaySubject with the given buffer size. - * - * @param the element type - * @param bufferSize the maximum number of items to keep in the buffer - * @return the created subject - */ - public static Subject replayBuffered(int bufferSize) { - return CustomReplaySubject.create(bufferSize); - } - /** * Creates a subject whose client observers will observe events * propagated through the given wrapped subject. @@ -80,74 +55,6 @@ public void call(Subscriber o) { return s; } - /** - * Create a CustomReplaySubject with the given time window length - * and optional buffer size. - * - * @param - * the source and return type - * @param time - * the length of the time window - * @param unit - * the unit of the time window length - * @param bufferSize - * the buffer size if >= 0, otherwise, the buffer will be unlimited - * @param scheduler - * the scheduler from where the current time is retrieved. The - * observers will not observe on this scheduler. - * @return a Subject with the required replay behavior - */ - public static Subject replayWindowed(long time, TimeUnit unit, int bufferSize, final Scheduler scheduler) { - final long ms = unit.toMillis(time); - if (ms <= 0) { - throw new IllegalArgumentException("The time window is less than 1 millisecond!"); - } - Func1> timestamp = new Func1>() { - @Override - public Timestamped call(T t1) { - return new Timestamped(scheduler.now(), t1); - } - }; - Func1, T> untimestamp = new Func1, T>() { - @Override - public T call(Timestamped t1) { - return t1.getValue(); - } - }; - - ReplayState, T> state; - - if (bufferSize >= 0) { - state = new ReplayState, T>(new VirtualBoundedList>(bufferSize), untimestamp); - } else { - state = new ReplayState, T>(new VirtualArrayList>(), untimestamp); - } - final ReplayState, T> fstate = state; - // time based eviction when a value is added - state.onValueAdded = new Action0() { - @Override - public void call() { - long now = scheduler.now(); - long before = now - ms; - for (int i = fstate.values.start(); i < fstate.values.end(); i++) { - Timestamped v = fstate.values.get(i); - if (v.getTimestampMillis() >= before) { - fstate.values.removeBefore(i); - break; - } - } - } - }; - // time based eviction when a client subscribes - state.onSubscription = state.onValueAdded; - - final CustomReplaySubject, T> brs = new CustomReplaySubject, T>( - new CustomReplaySubjectSubscribeFunc, T>(state), state, timestamp - ); - - return brs; - } - /** * Return an OnSubscribeFunc which delegates the subscription to the given observable. * @@ -193,638 +100,4 @@ public void onCompleted() { } } - - /** Base state with lock. */ - static class BaseState { - /** The lock to protect the other fields. */ - private final Lock lock = new ReentrantLock(); - - /** Lock. */ - public void lock() { - lock.lock(); - } - - /** Unlock. */ - public void unlock() { - lock.unlock(); - } - - } - - /** - * Base interface for logically indexing a list. - * - * @param - * the value type - */ - public interface VirtualList { - /** @return the number of elements in this list */ - int size(); - - /** - * Add an element to the list. - * - * @param value - * the value to add - */ - void add(T value); - - /** - * Retrieve an element at the specified logical index. - * - * @param index - * @return the element - */ - T get(int index); - - /** - * Remove elements up before the given logical index and move - * the start() to this index. - *

- * For example, a list contains 3 items. Calling removeUntil 2 will - * remove the first two items. - * - * @param index - */ - void removeBefore(int index); - - /** - * Clear the elements of this list and increase the - * start by the number of elements. - */ - void clear(); - - /** - * Returns the current head index of this list. - * - * @return the head index - */ - int start(); - - /** - * Returns the current tail index of this list (where the next value would appear). - * - * @return the tail index - */ - int end(); - - /** - * Clears and resets the indexes of the list. - */ - void reset(); - - /** - * Returns the current content as a list. - * - * @return the {@link java.util.List} representation of this virtual list - */ - List toList(); - } - - /** - * Behaves like a normal, unbounded ArrayList but with virtual index. - * @param the element type - */ - public static final class VirtualArrayList implements VirtualList { - /** The backing list . */ - final List list = new ArrayList(); - /** The virtual start index of the list. */ - int startIndex; - - @Override - public int size() { - return list.size(); - } - - @Override - public void add(T value) { - list.add(value); - } - - @Override - public T get(int index) { - return list.get(index - startIndex); - } - - @Override - public void removeBefore(int index) { - int j = index - startIndex; - if (j > 0 && j <= list.size()) { - list.subList(0, j).clear(); - } - startIndex = index; - } - - @Override - public void clear() { - startIndex += list.size(); - list.clear(); - } - - @Override - public int start() { - return startIndex; - } - - @Override - public int end() { - return startIndex + list.size(); - } - - @Override - public void reset() { - list.clear(); - startIndex = 0; - } - - @Override - public List toList() { - return new ArrayList(list); - } - - } - - /** - * A bounded list which increases its size up to a maximum capacity, then - * behaves like a circular buffer with virtual indexes. - * - * @param the element type - */ - public static final class VirtualBoundedList implements VirtualList { - /** A list that grows up to maxSize. */ - private final List list = new ArrayList(); - /** The maximum allowed size. */ - private final int maxSize; - /** The logical start index of the list. */ - int startIndex; - /** The head index inside the list, where the first readable value sits. */ - int head; - /** The tail index inside the list, where the next value will be added. */ - int tail; - /** The number of items in the list. */ - int count; - - /** - * Construct a VirtualBoundedList with the given maximum number of elements. - * - * @param maxSize - */ - public VirtualBoundedList(int maxSize) { - if (maxSize < 0) { - throw new IllegalArgumentException("maxSize < 0"); - } - this.maxSize = maxSize; - } - - @Override - public int start() { - return startIndex; - } - - @Override - public int end() { - return startIndex + count; - } - - @Override - public void clear() { - startIndex += count; - list.clear(); - head = 0; - tail = 0; - count = 0; - } - - @Override - public int size() { - return count; - } - - @Override - public void add(T value) { - if (list.size() == maxSize) { - list.set(tail, value); - head = (head + 1) % maxSize; - tail = (tail + 1) % maxSize; - startIndex++; - } else { - list.add(value); - tail = (tail + 1) % maxSize; - count++; - } - } - - @Override - public T get(int index) { - if (index < start() || index >= end()) { - throw new ArrayIndexOutOfBoundsException(index); - } - int idx = (head + (index - startIndex)) % maxSize; - return list.get(idx); - } - - @Override - public void removeBefore(int index) { - if (index <= start()) { - return; - } - if (index >= end()) { - clear(); - startIndex = index; - return; - } - int rc = index - startIndex; - int head2 = head + rc; - for (int i = head; i < head2; i++) { - list.set(i % maxSize, null); - count--; - } - startIndex = index; - head = head2 % maxSize; - } - - @Override - public List toList() { - List r = new ArrayList(list.size() + 1); - for (int i = head; i < head + count; i++) { - int idx = i % maxSize; - r.add(list.get(idx)); - } - return r; - } - - @Override - public void reset() { - list.clear(); - count = 0; - head = 0; - tail = 0; - } - - } - - /** - * The state class. - * - * @param - * the intermediate type stored in the values buffer - * @param - * the result type transformed via the resultSelector - */ - static final class ReplayState extends BaseState { - /** The values observed so far. */ - final VirtualList values; - /** The result selector. */ - final Func1 resultSelector; - /** The received error. */ - Throwable error; - /** General completion indicator. */ - boolean done; - /** The map of replayers. */ - final Map replayers = new LinkedHashMap(); - /** - * Callback once a value has been added but before it is replayed - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onValueAdded = new Action0() { - @Override - public void call() { - } - }; - /** - * Callback once an error has been called but before it is replayed - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onErrorAdded = new Action0() { - @Override - public void call() { - } - }; - /** - * Callback once completed has been called but before it is replayed - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onCompletedAdded = new Action0() { - @Override - public void call() { - } - }; - /** - * Callback to pre-manage the values if an observer unsubscribes - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onSubscription = new Action0() { - @Override - public void call() { - } - }; - - /** - * Construct a ReplayState with the supplied buffer and result selectors. - * - * @param values - * @param resultSelector - */ - public ReplayState(final VirtualList values, - final Func1 resultSelector) { - this.values = values; - this.resultSelector = resultSelector; - } - - /** - * Returns a live collection of the observers. - *

- * Caller should hold the lock. - * - * @return - */ - Collection replayers() { - return new ArrayList(replayers.values()); - } - - /** - * Add a replayer to the replayers and create a Subscription for it. - *

- * Caller should hold the lock. - * - * @param obs - * @return - */ - Subscription addReplayer(Subscriber obs) { - Subscription s = new Subscription() { - final AtomicBoolean once = new AtomicBoolean(); - - @Override - public void unsubscribe() { - if (once.compareAndSet(false, true)) { - remove(this); - } - } - - @Override - public boolean isUnsubscribed() { - return once.get(); - } - - }; - obs.add(s); - Replayer rp = new Replayer(obs, s); - replayers.put(s, rp); - rp.replayTill(values.start() + values.size()); - return s; - } - - /** The replayer that holds a value where the given observer is currently at. */ - final class Replayer { - protected final Subscriber wrapped; - /** Where this replayer was in reading the list. */ - protected int index; - /** To cancel and unsubscribe this replayer and observer. */ - protected final Subscription cancel; - - protected Replayer(Subscriber wrapped, Subscription cancel) { - this.wrapped = wrapped; - this.cancel = cancel; - } - - /** - * Replay up to the given index - * - * @param limit - */ - void replayTill(int limit) { - int si = values.start(); - if (index < si) { - index = si; - } - while (index < limit) { - TIntermediate value = values.get(index); - index++; - try { - wrapped.onNext(resultSelector.call(value)); - } catch (Throwable t) { - replayers.remove(cancel); - wrapped.onError(t); - return; - } - } - if (done) { - if (error != null) { - wrapped.onError(error); - } else { - wrapped.onCompleted(); - } - } - } - } - - /** - * Remove the subscription. - * - * @param s - */ - void remove(Subscription s) { - lock(); - try { - replayers.remove(s); - } finally { - unlock(); - } - } - - /** - * Add a notification value and limit the size of values. - *

- * Caller should hold the lock. - * - * @param value - */ - void add(TIntermediate value) { - values.add(value); - } - - /** Clears the value list. */ - void clearValues() { - lock(); - try { - values.clear(); - } finally { - unlock(); - } - } - } - - /** - * A customizable replay subject with support for transformations. - * - * @param - * the Observer side's value type - * @param - * the type of the elements in the replay buffer - * @param - * the value type of the observers subscribing to this subject - */ - public static final class CustomReplaySubject extends Subject { - /** - * Return a subject that retains all events and will replay them to an {@link Subscriber} that subscribes. - * @param the common value type - * @return a subject that retains all events and will replay them to an {@link Subscriber} that subscribes. - */ - public static CustomReplaySubject create() { - ReplayState state = new ReplayState(new VirtualArrayList(), Functions. identity()); - return new CustomReplaySubject( - new CustomReplaySubjectSubscribeFunc(state), state, - Functions. identity()); - } - - /** - * Create a bounded replay subject with the given maximum buffer size. - * - * @param the common value type - * @param maxSize - * the maximum size in number of onNext notifications - * @return the custom replay subject - */ - public static CustomReplaySubject create(int maxSize) { - ReplayState state = new ReplayState(new VirtualBoundedList(maxSize), Functions. identity()); - return new CustomReplaySubject( - new CustomReplaySubjectSubscribeFunc(state), state, - Functions. identity()); - } - - /** The replay state. */ - protected final ReplayState state; - /** The result selector. */ - protected final Func1 intermediateSelector; - - private CustomReplaySubject( - final OnSubscribe onSubscribe, - ReplayState state, - Func1 intermediateSelector) { - super(new OnSubscribe() { - - @Override - public void call(Subscriber sub) { - onSubscribe.call(sub); - } - - }); - this.state = state; - this.intermediateSelector = intermediateSelector; - } - - @Override - public void onCompleted() { - state.lock(); - try { - if (state.done) { - return; - } - state.done = true; - state.onCompletedAdded.call(); - replayValues(); - } finally { - state.unlock(); - } - } - - @Override - public void onError(Throwable e) { - state.lock(); - try { - if (state.done) { - return; - } - state.done = true; - state.error = e; - state.onErrorAdded.call(); - replayValues(); - } finally { - state.unlock(); - } - } - - @Override - public void onNext(TInput args) { - state.lock(); - try { - if (state.done) { - return; - } - state.add(intermediateSelector.call(args)); - state.onValueAdded.call(); - replayValues(); - } finally { - state.unlock(); - } - } - - /** - * Replay values up to the current index. - */ - protected void replayValues() { - int s = state.values.start() + state.values.size(); - for (ReplayState.Replayer rp : state.replayers()) { - rp.replayTill(s); - } - } - } - - /** - * The subscription function. - * - * @param - * the type of the elements in the replay buffer - * @param - * the value type of the observers subscribing to this subject - */ - protected static final class CustomReplaySubjectSubscribeFunc - implements Observable.OnSubscribe { - - private final ReplayState state; - - protected CustomReplaySubjectSubscribeFunc(ReplayState state) { - this.state = state; - } - - @Override - public void call(Subscriber t1) { - VirtualList values; - Throwable error; - state.lock(); - try { - if (!state.done) { - state.onSubscription.call(); - state.addReplayer(t1); - return; - } - values = state.values; - error = state.error; - } finally { - state.unlock(); - } - // fully replay the subject - for (int i = values.start(); i < values.end(); i++) { - try { - t1.onNext(state.resultSelector.call(values.get(i))); - } catch (Throwable t) { - t1.onError(t); - return; - } - } - if (error != null) { - t1.onError(error); - } else { - t1.onCompleted(); - } - } - } -} +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index a1a65d7c5b..b9057d23d0 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -58,7 +58,10 @@ public static AsyncSubject create() { public void call(SubjectObserver o) { Object v = state.get(); o.accept(v); - o.completeSingle(v); + NotificationLite nl = NotificationLite.instance(); + if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) { + o.onCompleted(); + } } }; return new AsyncSubject(state, state); diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 78efdc0edf..932d6b0cd3 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -17,12 +17,9 @@ import rx.Observer; -import rx.Subscriber; -import rx.functions.Action0; import rx.functions.Action1; import rx.operators.NotificationLite; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; -import rx.subscriptions.Subscriptions; /** * Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}. diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 8bfb77be5d..f1de19d760 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -17,11 +17,18 @@ import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import rx.Observer; +import rx.Scheduler; import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Functions; import rx.operators.NotificationLite; +import rx.schedulers.Timestamped; +import rx.subjects.ReplaySubject.NodeList.Node; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; /** @@ -45,164 +52,302 @@ } * - * @param + * @param the input and output type */ public final class ReplaySubject extends Subject { + /** + * Create an unbounded replay subject. + *

The internal buffer is backed by an {@link ArrayList} and starts with + * an initial capacity of 16. Once the number of elements reaches this capacity, + * it will grow as necessary (usually by 50%). However, as the number of + * elements grows, this causes frequent array reallocation and copying, and + * may hurt performance and latency. This can be avoided with the {@link #create(int)} + * overload which takes an initial capacity parameter and can be tuned to + * reduce the array reallocation frequency as needed. + * @param The input and output types + * @return the created subject + */ public static ReplaySubject create() { return create(16); } - - public static ReplaySubject create(int initialCapacity) { - final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); - final ReplayState state = new ReplayState(initialCapacity); - subscriptionManager.onStart = new Action1>() { + /** + * Create an unbounded replay subject with the specified initial buffer capacity. + *

Use this method to avoid excessive array reallocation while the internal + * buffer grows to accomodate new elements. For example, if it is known that the + * buffer will hold 32k elements, one can ask the ReplaySubject to preallocate + * it internal array with a capacity to hold that many elements. Once the elements + * start to arrive, the internal array won't need to grow, creating less garbage and + * no overhead due to frequent array-copying. + * @param The input and output types + * @param capacity the initial buffer capacity + * @return the created subject + */ + public static ReplaySubject create(int capacity) { + final UnboundedReplayState state = new UnboundedReplayState(capacity); + SubjectSubscriptionManager ssm = new SubjectSubscriptionManager(); + ssm.onStart = new Action1>() { @Override public void call(SubjectObserver o) { // replay history for this observer using the subscribing thread - int lastIndex = replayObserverFromIndex(state.history, 0, o); + int lastIndex = state.replayObserverFromIndex(0, o); // now that it is caught up add to observers state.replayState.put(o, lastIndex); } }; - subscriptionManager.onTerminated = new Action1>() { + ssm.onTerminated = new Action1>() { @Override public void call(SubjectObserver o) { Integer idx = state.replayState.remove(o); if (idx == null) { idx = 0; } - replayObserverFromIndex(state.history, idx, o); + // we will finish replaying if there is anything left + state.replayObserverFromIndex(idx, o); } }; - subscriptionManager.onUnsubscribed = new Action1>() { + ssm.onUnsubscribed = new Action1>() { @Override public void call(SubjectObserver o) { state.replayState.remove(o); } }; - return new ReplaySubject(subscriptionManager, subscriptionManager, state); + + return new ReplaySubject(ssm, ssm, state); } - - private static class ReplayState { - // single-producer, multi-consumer - final History history; - // each Observer is tracked here for what events they have received - final ConcurrentHashMap, Integer> replayState; - - public ReplayState(int initialCapacity) { - history = new History(initialCapacity); - replayState = new ConcurrentHashMap, Integer>(); - } + /** + * Create an unbounded replay subject with the bounded-implementation for testing purposes. + *

This variant behaves like the regular unbounded ReplaySubject created via {@link #create()} + * but uses the structures of the bounded-implementation. This is by no means intended + * for the replacement of the original, array-backed and unbounded ReplaySubject due to + * the additional overhead of the linked-list based internal buffer. The sole purpose + * is to allow testing and reasoning about the behavior of the bounded implementations + * without the interference of the eviction policies. + * @param the input and output types + * @return the created subject + */ + /* public */ static ReplaySubject createUnbounded() { + final BoundedState state = new BoundedState( + new EmptyEvictionPolicy(), + Functions.identity(), + Functions.identity() + ); + return createWithState(state, new DefaultOnAdd(state)); + } + /** + * Create a size-bounded replay subject. + *

In this setting, the ReplaySubject holds at most {@code size} elements in its + * internal buffer and discards the oldest element. + *

When observers subscribe to a terminated + * ReplaySubject, they are guaranteed to see at most {@code size} onNext events followed by + * a termination event. + *

In case an observer subscribes while the ReplaySubject is active, it + * will receive all events from within the buffer at that point in time and each event afterwards, + * even if the buffer evicts elements due to the size constraint in the mean time. + * In other terms, once an Observer subscribes, it will receive events without gaps in the sequence. + * @param the input and output types + * @param size the maximum number of buffered items + * @return the created subject + */ + public static ReplaySubject createWithSize(int size) { + final BoundedState state = new BoundedState( + new SizeEvictionPolicy(size), + Functions.identity(), + Functions.identity() + ); + return createWithState(state, new DefaultOnAdd(state)); + } + /** + * Create a time-bounded replay subject. + *

In this setting, the ReplaySubject tags each received onNext event internally with a timestamp + * value supplied by the {@link Scheduler} and keeps only those whose age is less than + * the supplied time value converted to milliseconds. For example, a value arrives at T=0 and the max age + * is set to 5; at T>=5 this first value is then evicted by any subsequent value or termination event, + * leaving the buffer empty. + *

Once the subject is terminated, observers subscribing to it will receive events that + * remained in the buffer after the terminal event, regardless of their age. + *

In case an observer subscribes while the ReplaySubject is active, it + * will receive only those events from within the buffer, which have age less than the specified time and + * each event afterwards, even if the buffer evicts elements due to the time constraint in the mean time. + * In other terms, once an Observer subscribes, it receives events without gaps in the sequence except the outdated events + * at the beginning of the sequence. + *

Note that terminal events (onError and onCompleted) trigger eviction as well. For example, with a max age of + * 5, the first value arrives at T=0, then an onCompleted arrives at T=10. If an observer subscribes at T=11, it will + * find an empty ReplaySubject with just an onCompleted event. + * @param the input and output types + * @param time the maximum age of the contained items + * @param unit the time unit + * @param scheduler the scheduler providing the current time + * @return the created subject + */ + public static ReplaySubject createWithTime(long time, TimeUnit unit, final Scheduler scheduler) { + final BoundedState state = new BoundedState( + new TimeEvictionPolicy(unit.toMillis(time), scheduler), + new AddTimestamped(scheduler), + new RemoveTimestamped() + ); + return createWithState(state, new TimedOnAdd(state, scheduler)); + } + /** + * Create a time- and size-bounded replay subject. + *

In this setting, the ReplaySubject tags each received onNext event internally with a timestamp + * value supplied by the {@link Scheduler} and holds at most {@code size} elements in the internal buffer. + * Elements are evicted from the start of the buffer if their age becomes less-than or equal + * to the supplied age in milliseconds or the buffer reaches its {@code size} limit. + *

When observers subscribe to a terminated ReplaySubject, receive the events that + * remained in the buffer after the terminal event, regardless of their age, but at most + * {@code size} elements. + *

In case an observer subscribes while the ReplaySubject is active, it + * will receive only those events from within the buffer, which have age less than the specified time and + * each event afterwards, even if the buffer evicts elements due to the time constraint in the mean time. + * In other terms, once an Observer subscribes, it receives events without gaps in the sequence except the outdated events + * at the beginning of the sequence. + *

Note that terminal events (onError and onCompleted) trigger eviction as well. For example, with a max age of + * 5, the first value arrives at T=0, then an onCompleted arrives at T=10. If an observer subscribes at T=11, it will + * find an empty ReplaySubject with just an onCompleted event. + * @param the input and output types + * @param time the maximum age of the contained items + * @param unit the time unit + * @param size the maximum number of buffered items + * @param scheduler the scheduler providing the current time + * @return the created subject + */ + public static ReplaySubject createWithTimeAndSize(long time, TimeUnit unit, int size, final Scheduler scheduler) { + final BoundedState state = new BoundedState( + new PairEvictionPolicy( + new SizeEvictionPolicy(size), + new TimeEvictionPolicy(unit.toMillis(time), scheduler) + ), + new AddTimestamped(scheduler), + new RemoveTimestamped() + ); + return createWithState(state, new TimedOnAdd(state, scheduler)); } + /** + * Create a bounded replay subject with the given state shared between the subject + * and the OnSubscribe functions. + * @param the result value type + * @param state the shared state + * @return the subject created + */ + static final ReplaySubject createWithState(final BoundedState state, + Action1> onStart) { + SubjectSubscriptionManager ssm = new SubjectSubscriptionManager(); + ssm.onStart = onStart; + ssm.onTerminated = new Action1>() { - private final SubjectSubscriptionManager subscriptionManager; - private final ReplayState state; + @Override + public void call(SubjectObserver t1) { + NodeList.Node l = state.removeState(t1); + if (l == null) { + l = state.head(); + } + state.replayObserverFromIndex(l, t1); + } + + }; + ssm.onUnsubscribed = new Action1>() { + @Override + public void call(SubjectObserver t1) { + state.removeState(t1); + } - protected ReplaySubject(OnSubscribe onSubscribe, SubjectSubscriptionManager subscriptionManager, ReplayState state) { + }; + + return new ReplaySubject(ssm, ssm, state); + } + /** The state storing the history and the references. */ + final ReplayState state; + /** The manager of subscribers. */ + final SubjectSubscriptionManager ssm; + ReplaySubject(OnSubscribe onSubscribe, SubjectSubscriptionManager ssm, ReplayState state) { super(onSubscribe); - this.subscriptionManager = subscriptionManager; + this.ssm = ssm; this.state = state; } - + @Override - public void onCompleted() { - - if (subscriptionManager.active) { - state.history.complete(); - for (SubjectObserver o : subscriptionManager.terminate(NotificationLite.instance().completed())) { + public void onNext(T t) { + if (ssm.active) { + state.next(t); + for (SubjectSubscriptionManager.SubjectObserver o : ssm.observers()) { if (caughtUp(o)) { - o.onCompleted(); + o.onNext(t); } } } } - + @Override public void onError(final Throwable e) { - if (subscriptionManager.active) { - state.history.complete(e); - for (SubjectObserver o : subscriptionManager.terminate(NotificationLite.instance().completed())) { + if (ssm.active) { + state.error(e); + for (SubjectObserver o : ssm.terminate(NotificationLite.instance().error(e))) { if (caughtUp(o)) { o.onError(e); } } } } - + @Override - public void onNext(T v) { - if (state.history.terminated) { - return; - } - state.history.next(v); - for (SubjectObserver o : subscriptionManager.observers()) { - if (caughtUp(o)) { - o.onNext(v); + public void onCompleted() { + if (ssm.active) { + state.complete(); + for (SubjectObserver o : ssm.terminate(NotificationLite.instance().completed())) { + if (caughtUp(o)) { + o.onCompleted(); + } } } } - - /* - * This is not very elegant but resulted in non-trivial performance improvement by - * eliminating the 'replay' code-path on the normal fast-path of emitting values. - * - * With this method: 16,151,174 ops/sec - * Without: 8,632,358 ops/sec + /** + * @return Returns the number of subscribers. */ + /* Support test. */int subscriberCount() { + return state.replayStateSize(); + } + private boolean caughtUp(SubjectObserver o) { if (!o.caughtUp) { o.caughtUp = true; - replayObserver(o); + state.replayObserver(o); return false; } else { // it was caught up so proceed the "raw route" return true; } } - - private void replayObserver(SubjectObserver observer) { - Integer lastEmittedLink = state.replayState.get(observer); - if (lastEmittedLink == null) { - lastEmittedLink = 0; - } - int l = replayObserverFromIndex(state.history, lastEmittedLink, observer); - state.replayState.put(observer, l); - } - - static int replayObserverFromIndex(History history, int idx, SubjectObserver observer) { - while (idx < history.index.get()) { - history.accept(observer, idx); - idx++; - } - - return idx; - } - + + // ********************* + // State implementations + // ********************* + /** - * NOT thread-safe for multi-writer. Assumes single-writer. - * Is thread-safe for multi-reader. - * - * @param + * The unbounded replay state. + * @param the input and output type */ - private static class History { + static final class UnboundedReplayState implements ReplayState { + /** Each Observer is tracked here for what events they have received. */ + final ConcurrentHashMap, Integer> replayState; private final NotificationLite nl = NotificationLite.instance(); + /** The size of the buffer. */ private final AtomicInteger index; + /** The buffer. */ private final ArrayList list; - private boolean terminated; - - public History(int initialCapacity) { + /** The termination flag. */ + private volatile boolean terminated; + public UnboundedReplayState(int initialCapacity) { index = new AtomicInteger(0); list = new ArrayList(initialCapacity); + replayState = new ConcurrentHashMap, Integer>(); } - public boolean next(T n) { + @Override + public void next(T n) { if (!terminated) { list.add(nl.next(n)); index.getAndIncrement(); - return true; - } else { - return false; } } @@ -210,6 +355,7 @@ public void accept(Observer o, int idx) { nl.accept(o, list.get(idx)); } + @Override public void complete() { if (!terminated) { terminated = true; @@ -217,19 +363,455 @@ public void complete() { index.getAndIncrement(); } } - public void complete(Throwable e) { + @Override + public void error(Throwable e) { if (!terminated) { terminated = true; list.add(nl.error(e)); index.getAndIncrement(); } } + + @Override + public boolean terminated() { + return terminated; + } + + @Override + public void replayObserver(SubjectObserver observer) { + Integer lastEmittedLink = replayState.get(observer); + if (lastEmittedLink != null) { + int l = replayObserverFromIndex(lastEmittedLink, observer); + replayState.put(observer, l); + } else { + throw new IllegalStateException("failed to find lastEmittedLink for: " + observer); + } + } + + @Override + public Integer replayObserverFromIndex(Integer idx, SubjectObserver observer) { + int i = idx; + while (i < index.get()) { + accept(observer, i); + i++; + } + + return i; + } + + @Override + public Integer replayObserverFromIndexTest(Integer idx, SubjectObserver observer, long now) { + return replayObserverFromIndex(idx, observer); + } + + @Override + public int replayStateSize() { + return replayState.size(); + } + } + + + /** + * The bounded replay state. + * @param the input and output type + */ + static final class BoundedState implements ReplayState> { + final NodeList list; + final AtomicReference> tail; + final ConcurrentHashMap, NodeList.Node> replayState; + final EvictionPolicy evictionPolicy; + final Func1 enterTransform; + final Func1 leaveTransform; + final NotificationLite nl = NotificationLite.instance(); + volatile boolean terminated; + public BoundedState(EvictionPolicy evictionPolicy, Func1 enterTransform, + Func1 leaveTransform) { + this.list = new NodeList(); + this.tail = new AtomicReference>(list.tail); + this.replayState = new ConcurrentHashMap, NodeList.Node>(); + this.evictionPolicy = evictionPolicy; + this.enterTransform = enterTransform; + this.leaveTransform = leaveTransform; + } + @Override + public void next(T value) { + if (!terminated) { + list.addLast(enterTransform.call(nl.next(value))); + evictionPolicy.evict(list); + tail.set(list.tail); + } + } + @Override + public void complete() { + if (!terminated) { + terminated = true; + // don't evict the terminal value + evictionPolicy.evict(list); + // so add it later + list.addLast(enterTransform.call(nl.completed())); + tail.set(list.tail); + } + + } + @Override + public void error(Throwable e) { + if (!terminated) { + terminated = true; + // don't evict the terminal value + evictionPolicy.evict(list); + // so add it later + list.addLast(enterTransform.call(nl.error(e))); + tail.set(list.tail); + } + } + public void accept(Observer o, NodeList.Node node) { + nl.accept(o, leaveTransform.call(node.value)); + } + /** + * Accept only non-stale nodes. + * @param o the target observer + * @param node the node to accept or reject + * @param now the current time + */ + public void acceptTest(Observer o, NodeList.Node node, long now) { + Object v = node.value; + if (!evictionPolicy.test(v, now)) { + nl.accept(o, leaveTransform.call(v)); + } + } + public Node head() { + return list.head; + } + public Node tail() { + return tail.get(); + } + public Node removeState(SubjectObserver o) { + return replayState.remove(o); + } + public void addState(SubjectObserver o, Node state) { + if (state == null) { + throw new IllegalStateException("Null state!"); + } else { + replayState.put(o, state); + } + } + @Override + public void replayObserver(SubjectObserver observer) { + NodeList.Node lastEmittedLink = replayState.get(observer); + NodeList.Node l = replayObserverFromIndex(lastEmittedLink, observer); + addState(observer, l); + } + @Override + public NodeList.Node replayObserverFromIndex( + NodeList.Node l, SubjectObserver observer) { + while (l != tail()) { + accept(observer, l.next); + l = l.next; + } + return l; + } + @Override + public NodeList.Node replayObserverFromIndexTest( + NodeList.Node l, SubjectObserver observer, long now) { + while (l != tail()) { + acceptTest(observer, l.next, now); + l = l.next; + } + return l; + } + + @Override + public boolean terminated() { + return terminated; + } + + @Override + public int replayStateSize() { + return replayState.size(); + } + } + + // ************** + // API interfaces + // ************** + /** - * @return Returns the number of subscribers. + * General API for replay state management. + * @param the input and output type + * @param the index type */ - /* Support test. */int subscriberCount() { - return state.replayState.size(); + interface ReplayState { + /** @return true if the subject has reached a terminal state. */ + boolean terminated(); + void replayObserver(SubjectObserver observer); + /** + * Replay the buffered values from an index position and return a new index + * @param idx the current index position + * @param observer the receiver of events + * @return the new index position + */ + I replayObserverFromIndex( + I idx, SubjectObserver observer); + /** + * Replay the buffered values from an index position while testing for stale entries and return a new index + * @param idx the current index position + * @param observer the receiver of events + * @return the new index position + */ + I replayObserverFromIndexTest( + I idx, SubjectObserver observer, long now); + /** + * @return the size of the replay state map for testing purposes. + */ + int replayStateSize(); + /** + * Add an OnNext value to the buffer + * @param value the value to add + */ + void next(T value); + /** + * Add an OnError exception and terminate the subject + * @param e the exception to add + */ + void error(Throwable e); + /** + * Add an OnCompleted exception and terminate the subject + */ + void complete(); + } + + /** Interface to manage eviction checking. */ + interface EvictionPolicy { + /** + * Subscribe-time checking for stale entries. + * @param value the value to test + * @param now the current time + * @return true if the value may be evicted + */ + boolean test(Object value, long now); + /** + * Evict values from the list + * @param list + */ + void evict(NodeList list); + } + + + // ************************ + // Callback implementations + // ************************ + + /** + * Remove elements from the beginning of the list if the size exceeds some threshold. + */ + static final class SizeEvictionPolicy implements EvictionPolicy { + final int maxSize; + + public SizeEvictionPolicy(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public void evict(NodeList t1) { + while (t1.size() > maxSize) { + t1.removeFirst(); + } + } + + @Override + public boolean test(Object value, long now) { + return true; // size gets never stale + } + } + /** + * Remove elements from the beginning of the list if the Timestamped value is older than + * a threshold. + */ + static final class TimeEvictionPolicy implements EvictionPolicy { + final long maxAgeMillis; + final Scheduler scheduler; + + public TimeEvictionPolicy(long maxAgeMillis, Scheduler scheduler) { + this.maxAgeMillis = maxAgeMillis; + this.scheduler = scheduler; + } + + @Override + public void evict(NodeList t1) { + long now = scheduler.now(); + while (!t1.isEmpty()) { + NodeList.Node n = t1.head.next; + if (test(n.value, now)) { + t1.removeFirst(); + } else { + break; + } + } + } + + @Override + public boolean test(Object value, long now) { + Timestamped ts = (Timestamped)value; + return ts.getTimestampMillis() <= now - maxAgeMillis; + } + + } + /** + * Pairs up two eviction policy callbacks. + */ + static final class PairEvictionPolicy implements EvictionPolicy { + final EvictionPolicy first; + final EvictionPolicy second; + + public PairEvictionPolicy(EvictionPolicy first, EvictionPolicy second) { + this.first = first; + this.second = second; + } + + @Override + public void evict(NodeList t1) { + first.evict(t1); + second.evict(t1); + } + + @Override + public boolean test(Object value, long now) { + return first.test(value, now) || second.test(value, now); + } + }; + + /** Maps the values to Timestamped. */ + static final class AddTimestamped implements Func1 { + final Scheduler scheduler; + + public AddTimestamped(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public Object call(Object t1) { + return new Timestamped(scheduler.now(), t1); + } + } + /** Maps timestamped values back to raw objects. */ + static final class RemoveTimestamped implements Func1 { + @Override + @SuppressWarnings("unchecked") + public Object call(Object t1) { + return ((Timestamped)t1).getValue(); + } + } + /** + * Default action of simply replaying the buffer on subscribe. + * @param the input and output value type + */ + static final class DefaultOnAdd implements Action1> { + final BoundedState state; + + public DefaultOnAdd(BoundedState state) { + this.state = state; + } + + @Override + public void call(SubjectObserver t1) { + NodeList.Node l = state.replayObserverFromIndex(state.head(), t1); + state.addState(t1, l); + } + + } + /** + * Action of replaying non-stale entries of the buffer on subscribe + * @param the input and output value + */ + static final class TimedOnAdd implements Action1> { + final BoundedState state; + final Scheduler scheduler; + + public TimedOnAdd(BoundedState state, Scheduler scheduler) { + this.state = state; + this.scheduler = scheduler; + } + + @Override + public void call(SubjectObserver t1) { + NodeList.Node l; + if (!state.terminated) { + // ignore stale entries if still active + l = state.replayObserverFromIndexTest(state.head(), t1, scheduler.now()); + } else { + // accept all if terminated + l = state.replayObserverFromIndex(state.head(), t1); + } + state.addState(t1, l); + } + } + /** + * A singly-linked list with volatile next node pointer. + * @param the value type + */ + static final class NodeList { + /** + * The node containing the value and references to neighbours. + * @param the value type + */ + static final class Node { + /** The managed value. */ + final T value; + /** The hard reference to the next node. */ + volatile Node next; + Node(T value) { + this.value = value; + } + } + /** The head of the list. */ + final Node head = new Node(null); + /** The tail of the list. */ + Node tail = head; + /** The number of elements in the list. */ + int size; + + public void addLast(T value) { + Node t = tail; + Node t2 = new Node(value); + t.next = t2; + tail = t2; + size++; + } + public T removeFirst() { + if (head.next == null) { + throw new IllegalStateException("Empty!"); + } + Node t = head.next; + head.next = t.next; + if (head.next == null) { + tail = head; + } + size--; + return t.value; + } + public boolean isEmpty() { + return size == 0; + } + public int size() { + return size; + } + public void clear() { + tail = head; + size = 0; + } + } + /** Empty eviction policy. */ + static final class EmptyEvictionPolicy implements EvictionPolicy { + @Override + public boolean test(Object value, long now) { + return true; + } + @Override + public void evict(NodeList list) { + } + + } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index d3af5a17e7..6ed6a5e942 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -314,16 +314,6 @@ protected void accept(Object n) { } } - /** - * Emit an OnCompleted event if the value object is - * an OnNext NotificationLite object or null. - * @param value - */ - protected void completeSingle(Object value) { - if (value == null || (!nl.isCompleted(value) && !nl.isError(value))) { - actual.onCompleted(); - } - } /** @return the actual Observer. */ protected Observer getActual() { return actual; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java b/rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java index b52c6d22d9..216cc3321d 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java @@ -22,78 +22,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Assert; import static org.junit.Assert.assertEquals; import org.junit.Test; import org.mockito.InOrder; + import rx.Observable; import rx.Observer; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; import rx.observables.ConnectableObservable; -import rx.operators.OperatorReplay.VirtualBoundedList; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; public class OperatorReplayTest { - @Test - public void testBoundedList() { - VirtualBoundedList list = new VirtualBoundedList(3); - - list.add(1); // idx: 0 - list.add(2); // idx: 1 - list.add(3); // idx: 2 - - Assert.assertEquals(3, list.size()); - - list.add(4); // idx: 3 - - Assert.assertEquals(3, list.size()); - Assert.assertEquals(Arrays.asList(2, 3, 4), list.toList()); - - Assert.assertEquals(1, list.start()); - Assert.assertEquals(4, list.end()); - - list.removeBefore(3); - - Assert.assertEquals(1, list.size()); - - Assert.assertEquals(Arrays.asList(4), list.toList()); - - Assert.assertEquals(3, list.start()); - Assert.assertEquals(4, list.end()); - } - - @Test(expected = ArrayIndexOutOfBoundsException.class) - public void testReadBefore() { - VirtualBoundedList list = new VirtualBoundedList(3); - - list.add(1); // idx: 0 - list.add(2); // idx: 1 - list.add(3); // idx: 2 - list.add(4); // idx: 3 - - list.get(0); - } - - @Test(expected = ArrayIndexOutOfBoundsException.class) - public void testReadAfter() { - VirtualBoundedList list = new VirtualBoundedList(3); - - list.add(1); // idx: 0 - list.add(2); // idx: 1 - list.add(3); // idx: 2 - list.add(4); // idx: 3 - - list.get(4); - } - @Test public void testBufferedReplay() { PublishSubject source = PublishSubject.create(); @@ -520,4 +465,4 @@ public void call() { assertEquals(2, effectCounter.get()); } } -} +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java new file mode 100644 index 0000000000..1138e4d226 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java @@ -0,0 +1,340 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +public class ReplaySubjectBoundedConcurrencyTest { + + public static void main(String args[]) { + try { + for (int i = 0; i < 100; i++) { + new ReplaySubjectConcurrencyTest().testSubscribeCompletionRaceCondition(); + new ReplaySubjectConcurrencyTest().testReplaySubjectConcurrentSubscriptions(); + new ReplaySubjectConcurrencyTest().testReplaySubjectConcurrentSubscribersDoingReplayDontBlockEachOther(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test(timeout = 4000) + public void testReplaySubjectConcurrentSubscribersDoingReplayDontBlockEachOther() throws InterruptedException { + final ReplaySubject replay = ReplaySubject.createUnbounded(); + Thread source = new Thread(new Runnable() { + + @Override + public void run() { + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber o) { + System.out.println("********* Start Source Data ***********"); + for (long l = 1; l <= 10000; l++) { + o.onNext(l); + } + System.out.println("********* Finished Source Data ***********"); + o.onCompleted(); + } + }).subscribe(replay); + } + }); + source.start(); + + long v = replay.toBlockingObservable().last(); + assertEquals(10000, v); + + // it's been played through once so now it will all be replays + final CountDownLatch slowLatch = new CountDownLatch(1); + Thread slowThread = new Thread(new Runnable() { + + @Override + public void run() { + Subscriber slow = new Subscriber() { + + @Override + public void onCompleted() { + System.out.println("*** Slow Observer completed"); + slowLatch.countDown(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Long args) { + if (args == 1) { + System.out.println("*** Slow Observer STARTED"); + } + try { + if (args % 10 == 0) { + Thread.sleep(1); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + replay.subscribe(slow); + try { + slowLatch.await(); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + }); + slowThread.start(); + + Thread fastThread = new Thread(new Runnable() { + + @Override + public void run() { + final CountDownLatch fastLatch = new CountDownLatch(1); + Subscriber fast = new Subscriber() { + + @Override + public void onCompleted() { + System.out.println("*** Fast Observer completed"); + fastLatch.countDown(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Long args) { + if (args == 1) { + System.out.println("*** Fast Observer STARTED"); + } + } + }; + replay.subscribe(fast); + try { + fastLatch.await(); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + }); + fastThread.start(); + fastThread.join(); + + // slow should not yet be completed when fast completes + assertEquals(1, slowLatch.getCount()); + + slowThread.join(); + } + + @Test + public void testReplaySubjectConcurrentSubscriptions() throws InterruptedException { + final ReplaySubject replay = ReplaySubject.createUnbounded(); + Thread source = new Thread(new Runnable() { + + @Override + public void run() { + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber o) { + System.out.println("********* Start Source Data ***********"); + for (long l = 1; l <= 10000; l++) { + o.onNext(l); + } + System.out.println("********* Finished Source Data ***********"); + o.onCompleted(); + } + }).subscribe(replay); + } + }); + + // used to collect results of each thread + final List> listOfListsOfValues = Collections.synchronizedList(new ArrayList>()); + final List threads = Collections.synchronizedList(new ArrayList()); + + for (int i = 1; i <= 200; i++) { + final int count = i; + if (count == 20) { + // start source data after we have some already subscribed + // and while others are in process of subscribing + source.start(); + } + if (count == 100) { + // wait for source to finish then keep adding after it's done + source.join(); + } + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + List values = replay.toList().toBlockingObservable().last(); + listOfListsOfValues.add(values); + System.out.println("Finished thread: " + count); + } + }); + t.start(); + System.out.println("Started thread: " + i); + threads.add(t); + } + + // wait for all threads to complete + for (Thread t : threads) { + t.join(); + } + + // assert all threads got the same results + List sums = new ArrayList(); + for (List values : listOfListsOfValues) { + long v = 0; + for (long l : values) { + v += l; + } + sums.add(v); + } + + long expected = sums.get(0); + boolean success = true; + for (long l : sums) { + if (l != expected) { + success = false; + System.out.println("FAILURE => Expected " + expected + " but got: " + l); + } + } + + if (success) { + System.out.println("Success! " + sums.size() + " each had the same sum of " + expected); + } else { + throw new RuntimeException("Concurrency Bug"); + } + + } + + /** + * Can receive timeout if subscribe never receives an onError/onCompleted ... which reveals a race condition. + */ + @Test(timeout = 10000) + public void testSubscribeCompletionRaceCondition() { + for (int i = 0; i < 50; i++) { + final ReplaySubject subject = ReplaySubject.createUnbounded(); + final AtomicReference value1 = new AtomicReference(); + + subject.subscribe(new Action1() { + + @Override + public void call(String t1) { + try { + // simulate a slow observer + Thread.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + value1.set(t1); + } + + }); + + Thread t1 = new Thread(new Runnable() { + + @Override + public void run() { + subject.onNext("value"); + subject.onCompleted(); + } + }); + + SubjectObserverThread t2 = new SubjectObserverThread(subject); + SubjectObserverThread t3 = new SubjectObserverThread(subject); + SubjectObserverThread t4 = new SubjectObserverThread(subject); + SubjectObserverThread t5 = new SubjectObserverThread(subject); + + t2.start(); + t3.start(); + t1.start(); + t4.start(); + t5.start(); + try { + t1.join(); + t2.join(); + t3.join(); + t4.join(); + t5.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertEquals("value", value1.get()); + assertEquals("value", t2.value.get()); + assertEquals("value", t3.value.get()); + assertEquals("value", t4.value.get()); + assertEquals("value", t5.value.get()); + } + + } + + /** + * https://github.com/Netflix/RxJava/issues/1147 + */ + @Test + public void testRaceForTerminalState() { + final List expected = Arrays.asList(1); + for (int i = 0; i < 100000; i++) { + TestSubscriber ts = new TestSubscriber(); + Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(expected); + ts.assertTerminalEvent(); + } + } + + private static class SubjectObserverThread extends Thread { + + private final ReplaySubject subject; + private final AtomicReference value = new AtomicReference(); + + public SubjectObserverThread(ReplaySubject subject) { + this.subject = subject; + } + + @Override + public void run() { + try { + // a timeout exception will happen if we don't get a terminal state + String v = subject.timeout(2000, TimeUnit.MILLISECONDS).toBlockingObservable().single(); + value.set(v); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index 4996b511d2..44e3dcfa67 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -37,6 +38,7 @@ import rx.Subscription; import rx.functions.Func1; import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; public class ReplaySubjectTest { @@ -437,4 +439,154 @@ public void onCompleted() { verify(o).onCompleted(); verify(o, never()).onError(any(Throwable.class)); } + @Test + public void testNodeListSimpleAddRemove() { + ReplaySubject.NodeList list = new ReplaySubject.NodeList(); + + assertEquals(0, list.size()); + + // add and remove one + + list.addLast(1); + + assertEquals(1, list.size()); + + assertEquals((Integer)1, list.removeFirst()); + + assertEquals(0, list.size()); + + // add and remove one again + + list.addLast(1); + + assertEquals(1, list.size()); + + assertEquals((Integer)1, list.removeFirst()); + + // add and remove two items + + list.addLast(1); + list.addLast(2); + + assertEquals(2, list.size()); + + assertEquals((Integer)1, list.removeFirst()); + assertEquals((Integer)2, list.removeFirst()); + + assertEquals(0, list.size()); + // clear two items + + list.addLast(1); + list.addLast(2); + + assertEquals(2, list.size()); + + list.clear(); + + assertEquals(0, list.size()); + } + @Test + public void testReplay1AfterTermination() { + ReplaySubject source = ReplaySubject.createWithSize(1); + + source.onNext(1); + source.onNext(2); + source.onCompleted(); + + for (int i = 0; i < 1; i++) { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + source.subscribe(o); + + verify(o, never()).onNext(1); + verify(o).onNext(2); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + } + @Test + public void testReplay1Directly() { + ReplaySubject source = ReplaySubject.createWithSize(1); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + source.onNext(1); + source.onNext(2); + + source.subscribe(o); + + source.onNext(3); + source.onCompleted(); + + verify(o, never()).onNext(1); + verify(o).onNext(2); + verify(o).onNext(3); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testReplayTimestampedAfterTermination() { + TestScheduler scheduler = new TestScheduler(); + ReplaySubject source = ReplaySubject.createWithTime(1, TimeUnit.SECONDS, scheduler); + + source.onNext(1); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(2); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(3); + source.onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + source.subscribe(o); + + verify(o, never()).onNext(1); + verify(o, never()).onNext(2); + verify(o).onNext(3); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testReplayTimestampedDirectly() { + TestScheduler scheduler = new TestScheduler(); + ReplaySubject source = ReplaySubject.createWithTime(1, TimeUnit.SECONDS, scheduler); + + source.onNext(1); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + source.subscribe(o); + + source.onNext(2); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(3); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onNext(1); + verify(o).onNext(2); + verify(o).onNext(3); + verify(o).onCompleted(); + } }