From 7d03daa8d6d0fb6d59d8f5561f0598d79cb85ea5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 27 Jul 2015 08:41:02 +0200 Subject: [PATCH] Operator replay() now supports backpressure (again) --- src/main/java/rx/Observable.java | 213 ++- .../OnSubscribeMulticastSelector.java | 77 -- .../rx/internal/operators/OperatorReplay.java | 1192 ++++++++++++++++- .../operators/OperatorReplayTest.java | 399 +++++- 4 files changed, 1620 insertions(+), 261 deletions(-) delete mode 100644 src/main/java/rx/internal/operators/OnSubscribeMulticastSelector.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9fd476eab7..8bdd016b58 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -25,7 +25,6 @@ import rx.observers.SafeSubscriber; import rx.plugins.*; import rx.schedulers.*; -import rx.subjects.*; import rx.subscriptions.Subscriptions; /** @@ -5987,9 +5986,9 @@ public Void call(Notification notification) { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -5999,14 +5998,7 @@ public Void call(Notification notification) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay() { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject. create(); - } - - }); + return OperatorReplay.create(this); } /** @@ -6016,9 +6008,9 @@ public final ConnectableObservable replay() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6033,12 +6025,12 @@ public final ConnectableObservable replay() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.create(); + public ConnectableObservable call() { + return Observable.this.replay(); } - }, selector)); + }, selector); } /** @@ -6049,9 +6041,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6069,12 +6061,12 @@ public final Subject call() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.createWithSize(bufferSize); + public ConnectableObservable call() { + return Observable.this.replay(bufferSize); } - }, selector)); + }, selector); } /** @@ -6085,9 +6077,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6121,9 +6113,9 @@ public final Observable replay(Func1, ? extends Obs * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6153,12 +6145,12 @@ public final Observable replay(Func1, ? extends Obs if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); + public ConnectableObservable call() { + return Observable.this.replay(bufferSize, time, unit, scheduler); } - }, selector)); + }, selector); } /** @@ -6169,9 +6161,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6190,13 +6182,18 @@ public final Subject call() { * replaying no more than {@code bufferSize} notifications * @see ReactiveX operators documentation: Replay */ - public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + public final Observable replay(final Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return OperatorReplay. createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); + public ConnectableObservable call() { + return Observable.this.replay(bufferSize); } - }, selector)); + }, new Func1, Observable>() { + @Override + public Observable call(Observable t) { + return selector.call(t).observeOn(scheduler); + } + }); } /** @@ -6207,9 +6204,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6240,9 +6237,9 @@ public final Observable replay(Func1, ? extends Obs * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6264,12 +6261,12 @@ public final Observable replay(Func1, ? extends Obs * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.createWithTime(time, unit, scheduler); + public ConnectableObservable call() { + return Observable.this.replay(time, unit, scheduler); } - }, selector)); + }, selector); } /** @@ -6279,9 +6276,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6298,13 +6295,18 @@ public final Subject call() { * replaying all items * @see ReactiveX operators documentation: Replay */ - public final Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + public final Observable replay(final Func1, ? extends Observable> selector, final Scheduler scheduler) { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); + public ConnectableObservable call() { + return Observable.this.replay(); } - }, selector)); + }, new Func1, Observable>() { + @Override + public Observable call(Observable t) { + return selector.call(t).observeOn(scheduler); + } + }); } /** @@ -6316,9 +6318,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6330,14 +6332,7 @@ public final Subject call() { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final int bufferSize) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject.createWithSize(bufferSize); - } - - }); + return OperatorReplay.create(this, bufferSize); } /** @@ -6349,9 +6344,9 @@ public final ConnectableObservable replay(final int bufferSize) { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6380,9 +6375,9 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6406,14 +6401,7 @@ public final ConnectableObservable replay(final int bufferSize, final long ti if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); - } - - }); + return OperatorReplay.create(this, time, unit, scheduler, bufferSize); } /** @@ -6425,9 +6413,9 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6441,14 +6429,7 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final int bufferSize, final Scheduler scheduler) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return OperatorReplay.createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); - } - - }); + return OperatorReplay.observeOn(replay(bufferSize), scheduler); } /** @@ -6460,9 +6441,9 @@ public final ConnectableObservable replay(final int bufferSize, final Schedul * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6488,9 +6469,9 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6506,14 +6487,7 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final long time, final TimeUnit unit, final Scheduler scheduler) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject.createWithTime(time, unit, scheduler); - } - - }); + return OperatorReplay.create(this, time, unit, scheduler); } /** @@ -6525,9 +6499,9 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6540,14 +6514,7 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final Scheduler scheduler) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); - } - - }); + return OperatorReplay.observeOn(replay(), scheduler); } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeMulticastSelector.java b/src/main/java/rx/internal/operators/OnSubscribeMulticastSelector.java deleted file mode 100644 index d1457ca6ec..0000000000 --- a/src/main/java/rx/internal/operators/OnSubscribeMulticastSelector.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.observables.ConnectableObservable; -import rx.observers.SafeSubscriber; -import rx.subjects.Subject; - -/** - * Returns an observable sequence that contains the elements of a sequence - * produced by multicasting the source sequence within a selector function. - * - * @see MSDN: Observable.Multicast - * - * @param the input value type - * @param the intermediate type - * @param the result type - */ -public final class OnSubscribeMulticastSelector implements OnSubscribe { - final Observable source; - final Func0> subjectFactory; - final Func1, ? extends Observable> resultSelector; - - public OnSubscribeMulticastSelector(Observable source, - Func0> subjectFactory, - Func1, ? extends Observable> resultSelector) { - this.source = source; - this.subjectFactory = subjectFactory; - this.resultSelector = resultSelector; - } - - @Override - public void call(Subscriber child) { - Observable observable; - ConnectableObservable connectable; - try { - connectable = new OperatorMulticast(source, subjectFactory); - - observable = resultSelector.call(connectable); - } catch (Throwable t) { - child.onError(t); - return; - } - - final SafeSubscriber s = new SafeSubscriber(child); - - observable.unsafeSubscribe(s); - - connectable.connect(new Action1() { - @Override - public void call(Subscription t1) { - s.add(t1); - } - }); - } - -} diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 83c76dfe39..e1bf7aa352 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -15,93 +15,1181 @@ */ package rx.internal.operators; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; +import rx.*; import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Scheduler; -import rx.Subscriber; -import rx.subjects.Subject; +import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorThrowable; +import rx.functions.*; +import rx.observables.ConnectableObservable; +import rx.schedulers.Timestamped; +import rx.subscriptions.Subscriptions; -/** - * Replay with limited buffer and/or time constraints. - * - * - * @see MSDN: Observable.Replay overloads - */ -public final class OperatorReplay { - /** Utility class. */ - private OperatorReplay() { - throw new IllegalStateException("No instances!"); +public final class OperatorReplay extends ConnectableObservable { + /** The source observable. */ + final Observable source; + /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ + final AtomicReference> current; + /** A factory that creates the appropriate buffer for the ReplaySubscriber. */ + final Func0> bufferFactory; + + @SuppressWarnings("rawtypes") + static final Func0 DEFAULT_UNBOUNDED_FACTORY = new Func0() { + @Override + public Object call() { + return new UnboundedReplayBuffer(16); + } + }; + + /** + * Given a connectable observable factory, it multicasts over the generated + * ConnectableObservable via a selector function. + * @param connectableFactory + * @param selector + * @return + */ + public static Observable multicastSelector( + final Func0> connectableFactory, + final Func1, ? extends Observable> selector) { + return Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber child) { + ConnectableObservable co; + Observable observable; + try { + co = connectableFactory.call(); + observable = selector.call(co); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + child.onError(e); + return; + } + + observable.subscribe(child); + + co.connect(new Action1() { + @Override + public void call(Subscription t) { + child.add(t); + } + }); + } + }); + } + + /** + * Child Subscribers will observe the events of the ConnectableObservable on the + * specified scheduler. + * @param co + * @param scheduler + * @return + */ + public static ConnectableObservable observeOn(final ConnectableObservable co, final Scheduler scheduler) { + final Observable observable = co.observeOn(scheduler); + OnSubscribe onSubscribe = new OnSubscribe() { + @Override + public void call(final Subscriber child) { + // apply observeOn and prevent calling onStart() again + observable.unsafeSubscribe(new Subscriber(child) { + @Override + public void onNext(T t) { + child.onNext(t); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + }); + } + }; + return new ConnectableObservable(onSubscribe) { + @Override + public void connect(Action1 connection) { + co.connect(connection); + } + }; + } + + /** + * Creates a replaying ConnectableObservable with an unbounded buffer. + * @param source + * @return + */ + @SuppressWarnings("unchecked") + public static ConnectableObservable create(Observable source) { + return create(source, DEFAULT_UNBOUNDED_FACTORY); + } + + /** + * Creates a replaying ConnectableObservable with a size bound buffer. + * @param source + * @param bufferSize + * @return + */ + public static ConnectableObservable create(Observable source, + final int bufferSize) { + if (bufferSize == Integer.MAX_VALUE) { + return create(source); + } + return create(source, new Func0>() { + @Override + public ReplayBuffer call() { + return new SizeBoundReplayBuffer(bufferSize); + } + }); } /** - * Creates a subject whose client observers will observe events - * propagated through the given wrapped subject. - * @param the element type - * @param subject the subject to wrap - * @param scheduler the target scheduler - * @return the created subject + * Creates a replaying ConnectableObservable with a time bound buffer. + * @param source + * @param maxAge + * @param unit + * @param scheduler + * @return */ - public static Subject createScheduledSubject(Subject subject, Scheduler scheduler) { - final Observable observedOn = subject.observeOn(scheduler); - SubjectWrapper s = new SubjectWrapper(new OnSubscribe() { + public static ConnectableObservable create(Observable source, + long maxAge, TimeUnit unit, Scheduler scheduler) { + return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE); + } + /** + * Creates a replaying ConnectableObservable with a size and time bound buffer. + * @param source + * @param maxAge + * @param unit + * @param scheduler + * @param bufferSize + * @return + */ + public static ConnectableObservable create(Observable source, + long maxAge, TimeUnit unit, final Scheduler scheduler, final int bufferSize) { + final long maxAgeInMillis = unit.toMillis(maxAge); + return create(source, new Func0>() { @Override - public void call(Subscriber o) { - subscriberOf(observedOn).call(o); + public ReplayBuffer call() { + return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAgeInMillis, scheduler); } - - }, subject); - return s; + }); } /** - * Return an OnSubscribeFunc which delegates the subscription to the given observable. - * - * @param the value type - * @param target the target observable - * @return the function that delegates the subscription to the target + * Creates a OperatorReplay instance to replay values of the given source observable. + * @param source the source observable + * @param bufferFactory the factory to instantiate the appropriate buffer when the observable becomes active + * @return the connectable observable */ - public static OnSubscribe subscriberOf(final Observable target) { - return new OnSubscribe() { + static ConnectableObservable create(Observable source, + final Func0> bufferFactory) { + // the current connection to source needs to be shared between the operator and its onSubscribe call + final AtomicReference> curr = new AtomicReference>(); + OnSubscribe onSubscribe = new OnSubscribe() { @Override - public void call(Subscriber t1) { - target.unsafeSubscribe(t1); + public void call(Subscriber child) { + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + ReplaySubscriber r = curr.get(); + // if there isn't one + if (r == null) { + // create a new subscriber to source + ReplaySubscriber u = new ReplaySubscriber(curr, bufferFactory.call()); + // perform extra initialization to avoid 'this' to escape during construction + u.init(); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + // create the backpressure-managing producer for this child + InnerProducer inner = new InnerProducer(r, child); + // we try to add it to the array of producers + // if it fails, no worries because we will still have its buffer + // so it is going to replay it for us + r.add(inner); + // the producer has been registered with the current subscriber-to-source so + // at least it will receive the next terminal event + child.add(inner); + // setting the producer will trigger the first request to be considered by + // the subscriber-to-source. + child.setProducer(inner); + break; + } } }; + return new OperatorReplay(onSubscribe, source, curr, bufferFactory); + } + private OperatorReplay(OnSubscribe onSubscribe, Observable source, + final AtomicReference> current, + final Func0> bufferFactory) { + super(onSubscribe); + this.source = source; + this.current = current; + this.bufferFactory = bufferFactory; + } + + @Override + public void connect(Action1 connection) { + boolean doConnect = false; + ReplaySubscriber ps; + // we loop because concurrent connect/disconnect and termination may change the state + for (;;) { + // retrieve the current subscriber-to-source instance + ps = current.get(); + // if there is none yet or the current has unsubscribed + if (ps == null || ps.isUnsubscribed()) { + // create a new subscriber-to-source + ReplaySubscriber u = new ReplaySubscriber(current, bufferFactory.call()); + // initialize out the constructor to avoid 'this' to escape + u.init(); + // try setting it as the current subscriber-to-source + if (!current.compareAndSet(ps, u)) { + // did not work, perhaps a new subscriber arrived + // and created a new subscriber-to-source as well, retry + continue; + } + ps = u; + } + // if connect() was called concurrently, only one of them should actually + // connect to the source + doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); + break; + } + /* + * Notify the callback that we have a (new) connection which it can unsubscribe + * but since ps is unique to a connection, multiple calls to connect() will return the + * same Subscription and even if there was a connect-disconnect-connect pair, the older + * references won't disconnect the newer connection. + * Synchronous source consumers have the opportunity to disconnect via unsubscribe on the + * Subscription as unsafeSubscribe may never return in its own. + * + * Note however, that asynchronously disconnecting a running source might leave + * child-subscribers without any terminal event; ReplaySubject does not have this + * issue because the unsubscription was always triggered by the child-subscribers + * themselves. + */ + connection.call(ps); + if (doConnect) { + source.unsafeSubscribe(ps); + } } + + @SuppressWarnings("rawtypes") + static final class ReplaySubscriber extends Subscriber implements Subscription { + /** Holds notifications from upstream. */ + final ReplayBuffer buffer; + /** The notification-lite factory. */ + final NotificationLite nl; + /** Contains either an onCompleted or an onError token from upstream. */ + boolean done; + + /** Indicates an empty array of inner producers. */ + static final InnerProducer[] EMPTY = new InnerProducer[0]; + /** Indicates a terminated ReplaySubscriber. */ + static final InnerProducer[] TERMINATED = new InnerProducer[0]; + + /** Tracks the subscribed producers. */ + final AtomicReference producers; + /** + * Atomically changed from false to true by connect to make sure the + * connection is only performed by one thread. + */ + final AtomicBoolean shouldConnect; + + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + boolean missed; + + + /** Contains the maximum element index the child Subscribers requested so far. Accessed while emitting is true. */ + long maxChildRequested; + /** Counts the outstanding upstream requests until the producer arrives. */ + long maxUpstreamRequested; + /** The upstream producer. */ + volatile Producer producer; + + public ReplaySubscriber(AtomicReference> current, + ReplayBuffer buffer) { + this.buffer = buffer; + + this.nl = NotificationLite.instance(); + this.producers = new AtomicReference(EMPTY); + this.shouldConnect = new AtomicBoolean(); + // make sure the source doesn't produce values until the child subscribers + // expressed their request amounts + this.request(0); + } + /** Should be called after the constructor finished to setup nulling-out the current reference. */ + void init() { + add(Subscriptions.create(new Action0() { + @Override + public void call() { + ReplaySubscriber.this.producers.getAndSet(TERMINATED); + // unlike OperatorPublish, we can't null out the terminated so + // late subscribers can still get replay + // current.compareAndSet(ReplaySubscriber.this, null); + // we don't care if it fails because it means the current has + // been replaced in the meantime + } + })); + } + /** + * Atomically try adding a new InnerProducer to this Subscriber or return false if this + * Subscriber was terminated. + * @param producer the producer to add + * @return true if succeeded, false otherwise + */ + boolean add(InnerProducer producer) { + if (producer == null) { + throw new NullPointerException(); + } + // the state can change so we do a CAS loop to achieve atomicity + for (;;) { + // get the current producer array + InnerProducer[] c = producers.get(); + // if this subscriber-to-source reached a terminal state by receiving + // an onError or onCompleted, just refuse to add the new producer + if (c == TERMINATED) { + return false; + } + // we perform a copy-on-write logic + int len = c.length; + InnerProducer[] u = new InnerProducer[len + 1]; + System.arraycopy(c, 0, u, 0, len); + u[len] = producer; + // try setting the producers array + if (producers.compareAndSet(c, u)) { + return true; + } + // if failed, some other operation succeded (another add, remove or termination) + // so retry + } + } + + /** + * Atomically removes the given producer from the producers array. + * @param producer the producer to remove + */ + void remove(InnerProducer producer) { + // the state can change so we do a CAS loop to achieve atomicity + for (;;) { + // let's read the current producers array + InnerProducer[] c = producers.get(); + // if it is either empty or terminated, there is nothing to remove so we quit + if (c == EMPTY || c == TERMINATED) { + return; + } + // let's find the supplied producer in the array + // although this is O(n), we don't expect too many child subscribers in general + int j = -1; + int len = c.length; + for (int i = 0; i < len; i++) { + if (c[i].equals(producer)) { + j = i; + break; + } + } + // we didn't find it so just quit + if (j < 0) { + return; + } + // we do copy-on-write logic here + InnerProducer[] u; + // we don't create a new empty array if producer was the single inhabitant + // but rather reuse an empty array + if (len == 1) { + u = EMPTY; + } else { + // otherwise, create a new array one less in size + u = new InnerProducer[len - 1]; + // copy elements being before the given producer + System.arraycopy(c, 0, u, 0, j); + // copy elements being after the given producer + System.arraycopy(c, j + 1, u, j, len - j - 1); + } + // try setting this new array as + if (producers.compareAndSet(c, u)) { + return; + } + // if we failed, it means something else happened + // (a concurrent add/remove or termination), we need to retry + } + } + + @Override + public void setProducer(Producer p) { + Producer p0 = producer; + if (p0 != null) { + throw new IllegalStateException("Only a single producer can be set on a Subscriber."); + } + producer = p; + manageRequests(); + replay(); + } + + @Override + public void onNext(T t) { + if (!done) { + buffer.next(t); + replay(); + } + } + @Override + public void onError(Throwable e) { + // The observer front is accessed serially as required by spec so + // no need to CAS in the terminal value + if (!done) { + done = true; + try { + buffer.error(e); + replay(); + } finally { + unsubscribe(); // expectation of testIssue2191 + } + } + } + @Override + public void onCompleted() { + // The observer front is accessed serially as required by spec so + // no need to CAS in the terminal value + if (!done) { + done = true; + try { + buffer.complete(); + replay(); + } finally { + unsubscribe(); + } + } + } + + /** + * Coordinates the request amounts of various child Subscribers. + */ + void manageRequests() { + // if the upstream has completed, no more requesting is possible + if (isUnsubscribed()) { + return; + } + synchronized (this) { + if (emitting) { + missed = true; + return; + } + emitting = true; + } + for (;;) { + // if the upstream has completed, no more requesting is possible + if (isUnsubscribed()) { + return; + } + + @SuppressWarnings("unchecked") + InnerProducer[] a = producers.get(); + + long ri = maxChildRequested; + long maxTotalRequests = 0; + + for (InnerProducer rp : a) { + maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get()); + } + + long ur = maxUpstreamRequested; + Producer p = producer; + long diff = maxTotalRequests - ri; + if (diff != 0) { + maxChildRequested = maxTotalRequests; + if (p != null) { + if (ur != 0L) { + maxUpstreamRequested = 0L; + p.request(ur + diff); + } else { + p.request(diff); + } + } else { + // collect upstream request amounts until there is a producer for them + long u = ur + diff; + if (u < 0) { + u = Long.MAX_VALUE; + } + maxUpstreamRequested = u; + } + } else + // if there were outstanding upstream requests and we have a producer + if (ur != 0L && p != null) { + maxUpstreamRequested = 0L; + // fire the accumulated requests + p.request(ur); + } + + synchronized (this) { + if (!missed) { + emitting = false; + return; + } + missed = false; + } + } + } + + /** + * Tries to replay the buffer contents to all known subscribers. + */ + void replay() { + @SuppressWarnings("unchecked") + InnerProducer[] a = producers.get(); + for (InnerProducer rp : a) { + buffer.replay(rp); + } + } + } /** - * A subject that wraps another subject. + * A Producer and Subscription that manages the request and unsubscription state of a + * child subscriber in thread-safe manner. + * We use AtomicLong as a base class to save on extra allocation of an AtomicLong and also + * save the overhead of the AtomicIntegerFieldUpdater. * @param the value type */ - public static final class SubjectWrapper extends Subject { - /** The wrapped subject. */ - final Subject subject; + static final class InnerProducer extends AtomicLong implements Producer, Subscription { + /** */ + private static final long serialVersionUID = -4453897557930727610L; + /** + * The parent subscriber-to-source used to allow removing the child in case of + * child unsubscription. + */ + final ReplaySubscriber parent; + /** The actual child subscriber. */ + final Subscriber child; + /** + * Holds an object that represents the current location in the buffer. + * Guarded by the emitter loop. + */ + Object index; + /** + * Keeps the sum of all requested amounts. + */ + final AtomicLong totalRequested; + /** Indicates an emission state. Guarded by this. */ + boolean emitting; + /** Indicates a missed update. Guarded by this. */ + boolean missed; + /** + * Indicates this child has been unsubscribed: the state is swapped in atomically and + * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. + */ + static final long UNSUBSCRIBED = Long.MIN_VALUE; + + public InnerProducer(ReplaySubscriber parent, Subscriber child) { + this.parent = parent; + this.child = child; + this.totalRequested = new AtomicLong(); + } + + @Override + public void request(long n) { + // ignore negative requests + if (n < 0) { + return; + } + // In general, RxJava doesn't prevent concurrent requests (with each other or with + // an unsubscribe) so we need a CAS-loop, but we need to handle + // request overflow and unsubscribed/not requested state as well. + for (;;) { + // get the current request amount + long r = get(); + // if child called unsubscribe() do nothing + if (r == UNSUBSCRIBED) { + return; + } + // ignore zero requests except any first that sets in zero + if (r >= 0L && n == 0) { + return; + } + // otherwise, increase the request count + long u = r + n; + // and check for long overflow + if (u < 0) { + // cap at max value, which is essentially unlimited + u = Long.MAX_VALUE; + } + // try setting the new request value + if (compareAndSet(r, u)) { + // increment the total request counter + addTotalRequested(n); + // if successful, notify the parent dispacher this child can receive more + // elements + parent.manageRequests(); + + parent.buffer.replay(this); + return; + } + // otherwise, someone else changed the state (perhaps a concurrent + // request or unsubscription so retry + } + } + + /** + * Increments the total requested amount. + * @param n the additional request amount + */ + void addTotalRequested(long n) { + for (;;) { + long r = totalRequested.get(); + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + if (totalRequested.compareAndSet(r, u)) { + return; + } + } + } + + /** + * Indicate that values have been emitted to this child subscriber by the dispatch() method. + * @param n the number of items emitted + * @return the updated request value (may indicate how much can be produced or a terminal state) + */ + public long produced(long n) { + // we don't allow producing zero or less: it would be a bug in the operator + if (n <= 0) { + throw new IllegalArgumentException("Cant produce zero or less"); + } + for (;;) { + // get the current request value + long r = get(); + // if the child has unsubscribed, simply return and indicate this + if (r == UNSUBSCRIBED) { + return UNSUBSCRIBED; + } + // reduce the requested amount + long u = r - n; + // if the new amount is less than zero, we have a bug in this operator + if (u < 0) { + throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); + } + // try updating the request value + if (compareAndSet(r, u)) { + // and return the udpated value + return u; + } + // otherwise, some concurrent activity happened and we need to retry + } + } + + @Override + public boolean isUnsubscribed() { + return get() == UNSUBSCRIBED; + } + @Override + public void unsubscribe() { + long r = get(); + // let's see if we are unsubscribed + if (r != UNSUBSCRIBED) { + // if not, swap in the terminal state, this is idempotent + // because other methods using CAS won't overwrite this value, + // concurrent calls to unsubscribe will atomically swap in the same + // terminal value + r = getAndSet(UNSUBSCRIBED); + // and only one of them will see a non-terminated value before the swap + if (r != UNSUBSCRIBED) { + // remove this from the parent + parent.remove(this); + // After removal, we might have unblocked the other child subscribers: + // let's assume this child had 0 requested before the unsubscription while + // the others had non-zero. By removing this 'blocking' child, the others + // are now free to receive events + parent.manageRequests(); + } + } + } + /** + * Convenience method to auto-cast the index object. + * @return + */ + @SuppressWarnings("unchecked") + U index() { + return (U)index; + } + } + /** + * The interface for interacting with various buffering logic. + * + * @param the value type + */ + interface ReplayBuffer { + /** + * Adds a regular value to the buffer. + * @param value + */ + void next(T value); + /** + * Adds a terminal exception to the buffer + * @param e + */ + void error(Throwable e); + /** + * Adds a completion event to the buffer + */ + void complete(); + /** + * Tries to replay the buffered values to the + * subscriber inside the output if there + * is new value and requests available at the + * same time. + * @param output + */ + void replay(InnerProducer output); + } + + /** + * Holds an unbounded list of events. + * + * @param the value type + */ + static final class UnboundedReplayBuffer extends ArrayList implements ReplayBuffer { + /** */ + private static final long serialVersionUID = 7063189396499112664L; + final NotificationLite nl; + /** The total number of events in the buffer. */ + volatile int size; + + public UnboundedReplayBuffer(int capacityHint) { + super(capacityHint); + nl = NotificationLite.instance(); + } + @Override + public void next(T value) { + add(nl.next(value)); + size++; + } - public SubjectWrapper(OnSubscribe func, Subject subject) { - super(func); - this.subject = subject; + @Override + public void error(Throwable e) { + add(nl.error(e)); + size++; } @Override - public void onNext(T args) { - subject.onNext(args); + public void complete() { + add(nl.completed()); + size++; } @Override - public void onError(Throwable e) { - subject.onError(e); + public void replay(InnerProducer output) { + synchronized (output) { + if (output.emitting) { + output.missed = true; + return; + } + output.emitting = true; + } + for (;;) { + if (output.isUnsubscribed()) { + return; + } + int sourceIndex = size; + + Integer destIndexObject = output.index(); + int destIndex = destIndexObject != null ? destIndexObject.intValue() : 0; + + long r = output.get(); + long r0 = r; + long e = 0L; + + while (r != 0L && destIndex < sourceIndex) { + Object o = get(destIndex); + try { + if (nl.accept(output.child, o)) { + return; + } + } catch (Throwable err) { + Exceptions.throwIfFatal(err); + output.unsubscribe(); + if (!nl.isError(o) && !nl.isCompleted(o)) { + output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o))); + } + return; + } + if (output.isUnsubscribed()) { + return; + } + destIndex++; + r--; + e++; + } + if (e != 0L) { + output.index = destIndex; + if (r0 != Long.MAX_VALUE) { + output.produced(e); + } + } + + synchronized (output) { + if (!output.missed) { + output.emitting = false; + return; + } + output.missed = false; + } + } + } + } + + /** + * Represents a node in a bounded replay buffer's linked list. + * + * @param the contained value type + */ + static final class Node extends AtomicReference { + /** */ + private static final long serialVersionUID = 245354315435971818L; + final Object value; + public Node(Object value) { + this.value = value; + } + } + + /** + * Base class for bounded buffering with options to specify an + * enter and leave transforms and custom truncation behavior. + * + * @param the value type + */ + static class BoundedReplayBuffer extends AtomicReference implements ReplayBuffer { + /** */ + private static final long serialVersionUID = 2346567790059478686L; + final NotificationLite nl; + + Node tail; + int size; + + public BoundedReplayBuffer() { + nl = NotificationLite.instance(); + Node n = new Node(null); + tail = n; + set(n); + } + + /** + * Add a new node to the linked list. + * @param n + */ + final void addLast(Node n) { + tail.set(n); + tail = n; + size++; + } + /** + * Remove the first node from the linked list. + */ + final void removeFirst() { + Node head = get(); + Node next = head.get(); + if (next == null) { + throw new IllegalStateException("Empty list!"); + } + size--; + // can't just move the head because it would retain the very first value + // can't null out the head's value because of late replayers would see null + setFirst(next); + } + /* test */ final void removeSome(int n) { + Node head = get(); + while (n > 0) { + head = head.get(); + n--; + size--; + } + + setFirst(head); + } + /** + * Arranges the given node is the new head from now on. + * @param n + */ + final void setFirst(Node n) { + set(n); + } + + @Override + public final void next(T value) { + Object o = enterTransform(nl.next(value)); + Node n = new Node(o); + addLast(n); + truncate(); } @Override - public void onCompleted() { - subject.onCompleted(); + public final void error(Throwable e) { + Object o = enterTransform(nl.error(e)); + Node n = new Node(o); + addLast(n); + truncateFinal(); } @Override - public boolean hasObservers() { - return this.subject.hasObservers(); + public final void complete() { + Object o = enterTransform(nl.completed()); + Node n = new Node(o); + addLast(n); + truncateFinal(); + } + + @Override + public final void replay(InnerProducer output) { + synchronized (output) { + if (output.emitting) { + output.missed = true; + return; + } + output.emitting = true; + } + for (;;) { + if (output.isUnsubscribed()) { + return; + } + + long r = output.get(); + long r0 = r; + long e = 0L; + + Node node = output.index(); + if (node == null) { + node = get(); + output.index = node; + } + + while (r != 0) { + Node v = node.get(); + if (v != null) { + Object o = leaveTransform(v.value); + try { + if (nl.accept(output.child, o)) { + output.index = null; + return; + } + } catch (Throwable err) { + output.index = null; + Exceptions.throwIfFatal(err); + output.unsubscribe(); + if (!nl.isError(o) && !nl.isCompleted(o)) { + output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o))); + } + return; + } + e++; + node = v; + } else { + break; + } + if (output.isUnsubscribed()) { + return; + } + } + + if (e != 0L) { + output.index = node; + if (r0 != Long.MAX_VALUE) { + output.produced(e); + } + } + + synchronized (output) { + if (!output.missed) { + output.emitting = false; + return; + } + output.missed = false; + } + } + + } + + /** + * Override this to wrap the NotificationLite object into a + * container to be used later by truncate. + * @param value + * @return + */ + Object enterTransform(Object value) { + return value; + } + /** + * Override this to unwrap the transformed value into a + * NotificationLite object. + * @param value + * @return + */ + Object leaveTransform(Object value) { + return value; + } + /** + * Override this method to truncate a non-terminated buffer + * based on its current properties. + */ + void truncate() { + + } + /** + * Override this method to truncate a terminated buffer + * based on its properties (i.e., truncate but the very last node). + */ + void truncateFinal() { + + } + /* test */ final void collect(Collection output) { + Node n = get(); + for (;;) { + Node next = n.get(); + if (next != null) { + Object o = next.value; + Object v = leaveTransform(o); + if (nl.isCompleted(v) || nl.isError(v)) { + break; + } + output.add(nl.getValue(v)); + n = next; + } else { + break; + } + } + } + /* test */ boolean hasError() { + return tail.value != null && nl.isError(leaveTransform(tail.value)); + } + /* test */ boolean hasCompleted() { + return tail.value != null && nl.isCompleted(leaveTransform(tail.value)); + } + } + + /** + * A bounded replay buffer implementation with size limit only. + * + * @param the value type + */ + static final class SizeBoundReplayBuffer extends BoundedReplayBuffer { + /** */ + private static final long serialVersionUID = -5898283885385201806L; + + final int limit; + public SizeBoundReplayBuffer(int limit) { + this.limit = limit; + } + + @Override + void truncate() { + // overflow can be at most one element + if (size > limit) { + removeFirst(); + } + } + + // no need for final truncation because values are truncated one by one + } + + /** + * Size and time bound replay buffer. + * + * @param the buffered value type + */ + static final class SizeAndTimeBoundReplayBuffer extends BoundedReplayBuffer { + /** */ + private static final long serialVersionUID = 3457957419649567404L; + final Scheduler scheduler; + final long maxAgeInMillis; + final int limit; + public SizeAndTimeBoundReplayBuffer(int limit, long maxAgeInMillis, Scheduler scheduler) { + this.scheduler = scheduler; + this.limit = limit; + this.maxAgeInMillis = maxAgeInMillis; + } + + @Override + Object enterTransform(Object value) { + return new Timestamped(scheduler.now(), value); + } + + @Override + Object leaveTransform(Object value) { + return ((Timestamped)value).getValue(); + } + + @Override + void truncate() { + long timeLimit = scheduler.now() - maxAgeInMillis; + + Node prev = get(); + Node next = prev.get(); + + int e = 0; + for (;;) { + if (next != null) { + if (size > limit) { + e++; + size--; + prev = next; + next = next.get(); + } else { + Timestamped v = (Timestamped)next.value; + if (v.getTimestampMillis() <= timeLimit) { + e++; + size--; + prev = next; + next = next.get(); + } else { + break; + } + } + } else { + break; + } + } + if (e != 0) { + setFirst(prev); + } + } + @Override + void truncateFinal() { + long timeLimit = scheduler.now() - maxAgeInMillis; + + Node prev = get(); + Node next = prev.get(); + + int e = 0; + for (;;) { + if (next != null && size > 1) { + Timestamped v = (Timestamped)next.value; + if (v.getTimestampMillis() <= timeLimit) { + e++; + size--; + prev = next; + next = next.get(); + } else { + break; + } + } else { + break; + } + } + if (e != 0) { + setFirst(prev); + } } } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index a5ff85864d..046803b082 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -15,33 +15,40 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.notNull; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; import rx.Scheduler; import rx.Scheduler.Worker; +import rx.Subscriber; import rx.Subscription; +import rx.exceptions.TestException; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; +import rx.internal.operators.OperatorReplay.BoundedReplayBuffer; +import rx.internal.operators.OperatorReplay.Node; +import rx.internal.operators.OperatorReplay.SizeAndTimeBoundReplayBuffer; import rx.observables.ConnectableObservable; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; @@ -739,4 +746,378 @@ public boolean isUnsubscribed() { } } + @Test + public void testBoundedReplayBuffer() { + BoundedReplayBuffer buf = new BoundedReplayBuffer(); + buf.addLast(new Node(1)); + buf.addLast(new Node(2)); + buf.addLast(new Node(3)); + buf.addLast(new Node(4)); + buf.addLast(new Node(5)); + + List values = new ArrayList(); + buf.collect(values); + + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values); + + buf.removeSome(2); + buf.removeFirst(); + buf.removeSome(2); + + values.clear(); + buf.collect(values); + Assert.assertTrue(values.isEmpty()); + + buf.addLast(new Node(5)); + buf.addLast(new Node(6)); + buf.collect(values); + + Assert.assertEquals(Arrays.asList(5, 6), values); + + } + + @Test + public void testTimedAndSizedTruncation() { + TestScheduler test = Schedulers.test(); + SizeAndTimeBoundReplayBuffer buf = new SizeAndTimeBoundReplayBuffer(2, 2000, test); + List values = new ArrayList(); + + buf.next(1); + test.advanceTimeBy(1, TimeUnit.SECONDS); + buf.next(2); + test.advanceTimeBy(1, TimeUnit.SECONDS); + buf.collect(values); + Assert.assertEquals(Arrays.asList(1, 2), values); + + buf.next(3); + buf.next(4); + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(3, 4), values); + + test.advanceTimeBy(2, TimeUnit.SECONDS); + buf.next(5); + + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(5), values); + + test.advanceTimeBy(2, TimeUnit.SECONDS); + buf.complete(); + + values.clear(); + buf.collect(values); + Assert.assertTrue(values.isEmpty()); + + Assert.assertEquals(1, buf.size); + Assert.assertTrue(buf.hasCompleted()); + } + + @Test + public void testBackpressure() { + final AtomicLong requested = new AtomicLong(); + Observable source = Observable.range(1, 1000) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requested.addAndGet(t); + } + }); + ConnectableObservable co = source.replay(); + + TestSubscriber ts1 = TestSubscriber.create(10); + TestSubscriber ts2 = TestSubscriber.create(90); + + co.subscribe(ts1); + co.subscribe(ts2); + + ts2.requestMore(10); + + co.connect(); + + ts1.assertValueCount(10); + ts1.assertNoTerminalEvent(); + + ts2.assertValueCount(100); + ts2.assertNoTerminalEvent(); + + Assert.assertEquals(100, requested.get()); + } + + @Test + public void testBackpressureBounded() { + final AtomicLong requested = new AtomicLong(); + Observable source = Observable.range(1, 1000) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requested.addAndGet(t); + } + }); + ConnectableObservable co = source.replay(50); + + TestSubscriber ts1 = TestSubscriber.create(10); + TestSubscriber ts2 = TestSubscriber.create(90); + + co.subscribe(ts1); + co.subscribe(ts2); + + ts2.requestMore(10); + + co.connect(); + + ts1.assertValueCount(10); + ts1.assertNoTerminalEvent(); + + ts2.assertValueCount(100); + ts2.assertNoTerminalEvent(); + + Assert.assertEquals(100, requested.get()); + } + + @Test + public void testColdReplayNoBackpressure() { + Observable source = Observable.range(0, 1000).replay().autoConnect(); + + TestSubscriber ts = new TestSubscriber(); + + source.subscribe(ts); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + List onNextEvents = ts.getOnNextEvents(); + assertEquals(1000, onNextEvents.size()); + + for (int i = 0; i < 1000; i++) { + assertEquals((Integer)i, onNextEvents.get(i)); + } + } + @Test + public void testColdReplayBackpressure() { + Observable source = Observable.range(0, 1000).replay().autoConnect(); + + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(10); + + source.subscribe(ts); + + ts.assertNoErrors(); + assertTrue(ts.getOnCompletedEvents().isEmpty()); + List onNextEvents = ts.getOnNextEvents(); + assertEquals(10, onNextEvents.size()); + + for (int i = 0; i < 10; i++) { + assertEquals((Integer)i, onNextEvents.get(i)); + } + + ts.unsubscribe(); + } + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(final Subscriber observer) { + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + } + }).replay().autoConnect(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + + @Test + public void testUnsubscribeSource() { + Action0 unsubscribe = mock(Action0.class); + Observable o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + verify(unsubscribe, times(1)).call(); + } + + @Test + public void testTake() { + TestSubscriber ts = new TestSubscriber(); + + Observable cached = Observable.range(1, 100).replay().autoConnect(); + cached.take(10).subscribe(ts); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + ts.assertUnsubscribed(); + } + + @Test + public void testAsync() { + Observable source = Observable.range(1, 10000); + for (int i = 0; i < 100; i++) { + TestSubscriber ts1 = new TestSubscriber(); + + Observable cached = source.replay().autoConnect(); + + cached.observeOn(Schedulers.computation()).subscribe(ts1); + + ts1.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts1.assertNoErrors(); + ts1.assertTerminalEvent(); + assertEquals(10000, ts1.getOnNextEvents().size()); + + TestSubscriber ts2 = new TestSubscriber(); + cached.observeOn(Schedulers.computation()).subscribe(ts2); + + ts2.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts2.assertNoErrors(); + ts2.assertTerminalEvent(); + assertEquals(10000, ts2.getOnNextEvents().size()); + } + } + @Test + public void testAsyncComeAndGo() { + Observable source = Observable.interval(1, 1, TimeUnit.MILLISECONDS) + .take(1000) + .subscribeOn(Schedulers.io()); + Observable cached = source.replay().autoConnect(); + + Observable output = cached.observeOn(Schedulers.computation()); + + List> list = new ArrayList>(100); + for (int i = 0; i < 100; i++) { + TestSubscriber ts = new TestSubscriber(); + list.add(ts); + output.skip(i * 10).take(10).subscribe(ts); + } + + List expected = new ArrayList(); + for (int i = 0; i < 10; i++) { + expected.add((long)(i - 10)); + } + int j = 0; + for (TestSubscriber ts : list) { + ts.awaitTerminalEvent(3, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + for (int i = j * 10; i < j * 10 + 10; i++) { + expected.set(i - j * 10, (long)i); + } + + ts.assertReceivedOnNext(expected); + + j++; + } + } + + @Test + public void testNoMissingBackpressureException() { + final int m = 4 * 1000 * 1000; + Observable firehose = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t) { + for (int i = 0; i < m; i++) { + t.onNext(i); + } + t.onCompleted(); + } + }); + + TestSubscriber ts = new TestSubscriber(); + firehose.replay().autoConnect().observeOn(Schedulers.computation()).takeLast(100).subscribe(ts); + + ts.awaitTerminalEvent(3, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + assertEquals(100, ts.getOnNextEvents().size()); + } + + @Test + public void testValuesAndThenError() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException())) + .replay().autoConnect(); + + + TestSubscriber ts = new TestSubscriber(); + source.subscribe(ts); + + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); + Assert.assertEquals(1, ts.getOnErrorEvents().size()); + + TestSubscriber ts2 = new TestSubscriber(); + source.subscribe(ts2); + + ts2.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + Assert.assertTrue(ts2.getOnCompletedEvents().isEmpty()); + Assert.assertEquals(1, ts2.getOnErrorEvents().size()); + } + + @Test + public void unsafeChildThrows() { + final AtomicInteger count = new AtomicInteger(); + + Observable source = Observable.range(1, 100) + .doOnNext(new Action1() { + @Override + public void call(Integer t) { + count.getAndIncrement(); + } + }) + .replay().autoConnect(); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + throw new TestException(); + } + }; + + source.unsafeSubscribe(ts); + + Assert.assertEquals(100, count.get()); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } } \ No newline at end of file