From 96f4b664bdd0a42222bcacd73f7ee618ca006508 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 25 Apr 2014 23:42:45 +0200 Subject: [PATCH] OperatorMulticastAndReplay --- rxjava-core/src/main/java/rx/Observable.java | 65 ++++---- .../java/rx/operators/OperationMulticast.java | 148 ------------------ .../java/rx/operators/OperatorMulticast.java | 86 ++++++++++ .../operators/OperatorMulticastSelector.java | 76 +++++++++ ...erationReplay.java => OperatorReplay.java} | 112 ++++++------- ...stTest.java => OperatorMulticastTest.java} | 8 +- ...eplayTest.java => OperatorReplayTest.java} | 18 ++- 7 files changed, 259 insertions(+), 254 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationMulticast.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorMulticast.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorMulticastSelector.java rename rxjava-core/src/main/java/rx/operators/{OperationReplay.java => OperatorReplay.java} (90%) rename rxjava-core/src/test/java/rx/operators/{OperationMulticastTest.java => OperatorMulticastTest.java} (91%) rename rxjava-core/src/test/java/rx/operators/{OperationReplayTest.java => OperatorReplayTest.java} (95%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..4a40fb3909 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -56,12 +56,11 @@ import rx.operators.OperationJoin; import rx.operators.OperationMergeDelayError; import rx.operators.OperationMergeMaxConcurrent; -import rx.operators.OperationMulticast; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; -import rx.operators.OperationReplay; +import rx.operators.OperatorReplay; import rx.operators.OperationSample; import rx.operators.OperationSequenceEqual; import rx.operators.OperationSkip; @@ -105,6 +104,8 @@ import rx.operators.OperatorMerge; import rx.operators.OperatorMergeMapPair; import rx.operators.OperatorMergeMapTransform; +import rx.operators.OperatorMulticast; +import rx.operators.OperatorMulticastSelector; import rx.operators.OperatorObserveOn; import rx.operators.OperatorOnErrorFlatMap; import rx.operators.OperatorOnErrorResumeNextViaFunction; @@ -4419,7 +4420,7 @@ public final Observable mergeMapIterable(Func1 Observable multicast( final Func0> subjectFactory, final Func1, ? extends Observable> selector) { - return OperationMulticast.multicast(this, subjectFactory, selector); + return create(new OperatorMulticastSelector(this, subjectFactory, selector)); } /** @@ -4436,7 +4437,7 @@ public final Observable multicast( * Observable.multicast() */ public final ConnectableObservable multicast(Subject subject) { - return OperationMulticast.multicast(this, subject); + return new OperatorMulticast(this, subject); } /** @@ -4641,7 +4642,7 @@ public final Observable parallel(final Func1, Observable * @see RxJava Wiki: publish() */ public final ConnectableObservable publish() { - return OperationMulticast.multicast(this, PublishSubject. create()); + return new OperatorMulticast(this, PublishSubject. create()); } /** @@ -4704,7 +4705,7 @@ public final Subject call() { * and starts with {@code initialValue} */ public final ConnectableObservable publish(T initialValue) { - return OperationMulticast.multicast(this, BehaviorSubject. create(initialValue)); + return new OperatorMulticast(this, BehaviorSubject. create(initialValue)); } /** @@ -4716,7 +4717,7 @@ public final ConnectableObservable publish(T initialValue) { * @see RxJava Wiki: publishLast() */ public final ConnectableObservable publishLast() { - return OperationMulticast.multicast(this, AsyncSubject. create()); + return new OperatorMulticast(this, AsyncSubject. create()); } /** @@ -4879,7 +4880,7 @@ public final Observable repeat(long count, Scheduler scheduler) { * @see RxJava Wiki: replay() */ public final ConnectableObservable replay() { - return OperationMulticast.multicast(this, ReplaySubject. create()); + return new OperatorMulticast(this, ReplaySubject. create()); } /** @@ -4898,12 +4899,12 @@ public final ConnectableObservable replay() { * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector) { - return OperationMulticast.multicast(this, new Func0>() { + return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { return ReplaySubject.create(); } - }, selector); + }, selector)); } /** @@ -4927,12 +4928,12 @@ public final Subject call() { * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { - return OperationMulticast.multicast(this, new Func0>() { + return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperationReplay.replayBuffered(bufferSize); + return OperatorReplay.replayBuffered(bufferSize); } - }, selector); + }, selector)); } /** @@ -4995,12 +4996,12 @@ public final Observable replay(Func1, ? extends Obs if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return OperationMulticast.multicast(this, new Func0>() { + return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler); + return OperatorReplay.replayWindowed(time, unit, bufferSize, scheduler); } - }, selector); + }, selector)); } /** @@ -5026,12 +5027,12 @@ public final Subject call() { * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { - return OperationMulticast.multicast(this, new Func0>() { + return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperationReplay. createScheduledSubject(OperationReplay. replayBuffered(bufferSize), scheduler); + return OperatorReplay. createScheduledSubject(OperatorReplay. replayBuffered(bufferSize), scheduler); } - }, selector); + }, selector)); } /** @@ -5085,12 +5086,12 @@ public final Observable replay(Func1, ? extends Obs * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { - return OperationMulticast.multicast(this, new Func0>() { + return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperationReplay.replayWindowed(time, unit, -1, scheduler); + return OperatorReplay.replayWindowed(time, unit, -1, scheduler); } - }, selector); + }, selector)); } /** @@ -5113,12 +5114,12 @@ public final Subject call() { * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { - return OperationMulticast.multicast(this, new Func0>() { + return create(new OperatorMulticastSelector(this, new Func0>() { @Override public final Subject call() { - return OperationReplay.createScheduledSubject(ReplaySubject. create(), scheduler); + return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); } - }, selector); + }, selector)); } /** @@ -5135,7 +5136,7 @@ public final Subject call() { * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize) { - return OperationMulticast.multicast(this, OperationReplay. replayBuffered(bufferSize)); + return new OperatorMulticast(this, OperatorReplay. replayBuffered(bufferSize)); } /** @@ -5184,7 +5185,7 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return OperationMulticast.multicast(this, OperationReplay. replayWindowed(time, unit, bufferSize, scheduler)); + return new OperatorMulticast(this, OperatorReplay. replayWindowed(time, unit, bufferSize, scheduler)); } /** @@ -5203,9 +5204,9 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize, Scheduler scheduler) { - return OperationMulticast.multicast(this, - OperationReplay.createScheduledSubject( - OperationReplay. replayBuffered(bufferSize), scheduler)); + return new OperatorMulticast(this, + OperatorReplay.createScheduledSubject( + OperatorReplay. replayBuffered(bufferSize), scheduler)); } /** @@ -5245,7 +5246,7 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { - return OperationMulticast.multicast(this, OperationReplay. replayWindowed(time, unit, -1, scheduler)); + return new OperatorMulticast(this, OperatorReplay. replayWindowed(time, unit, -1, scheduler)); } /** @@ -5262,7 +5263,7 @@ public final ConnectableObservable replay(long time, TimeUnit unit, Scheduler * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(Scheduler scheduler) { - return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject. create(), scheduler)); + return new OperatorMulticast(this, OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java deleted file mode 100644 index f6760ee052..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.observables.ConnectableObservable; -import rx.subjects.Subject; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; - -public class OperationMulticast { - public static ConnectableObservable multicast(Observable source, final Subject subject) { - return new MulticastConnectableObservable(source, subject); - } - - private static class MulticastConnectableObservable extends ConnectableObservable { - private final Object lock = new Object(); - - private final Observable source; - private final Subject subject; - - private Subscription subscription; - - public MulticastConnectableObservable(Observable source, final Subject subject) { - super(new OnSubscribe() { - @Override - public void call(Subscriber observer) { - subject.unsafeSubscribe(observer); - } - }); - this.source = source; - this.subject = subject; - } - - public Subscription connect() { - synchronized (lock) { - if (subscription == null) { - subscription = source.unsafeSubscribe(new Subscriber() { - @Override - public void onCompleted() { - subject.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subject.onError(e); - } - - @Override - public void onNext(T args) { - subject.onNext(args); - } - }); - } - } - - return Subscriptions.create(new Action0() { - @Override - public void call() { - synchronized (lock) { - if (subscription != null) { - subscription.unsubscribe(); - subscription = null; - } - } - } - }); - } - - } - - /** - * Returns an observable sequence that contains the elements of a sequence - * produced by multicasting the source sequence within a selector function. - * - * @param source - * @param subjectFactory - * @param selector - * @return - * - * @see MSDN: Observable.Multicast - */ - public static Observable multicast( - final Observable source, - final Func0> subjectFactory, - final Func1, ? extends Observable> selector) { - return Observable.create(new MulticastSubscribeFunc(source, subjectFactory, selector)); - } - - /** The multicast subscription function. */ - private static final class MulticastSubscribeFunc implements OnSubscribeFunc { - final Observable source; - final Func0> subjectFactory; - final Func1, ? extends Observable> resultSelector; - - public MulticastSubscribeFunc(Observable source, - Func0> subjectFactory, - Func1, ? extends Observable> resultSelector) { - this.source = source; - this.subjectFactory = subjectFactory; - this.resultSelector = resultSelector; - } - - @Override - public Subscription onSubscribe(Observer t1) { - Observable observable; - ConnectableObservable connectable; - try { - Subject subject = subjectFactory.call(); - - connectable = new MulticastConnectableObservable(source, subject); - - observable = resultSelector.call(connectable); - } catch (Throwable t) { - t1.onError(t); - return Subscriptions.empty(); - } - - CompositeSubscription csub = new CompositeSubscription(); - - csub.add(observable.unsafeSubscribe(new SafeObserver( - new SafeObservableSubscription(csub), t1))); - csub.add(connectable.connect()); - - return csub; - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMulticast.java b/rxjava-core/src/main/java/rx/operators/OperatorMulticast.java new file mode 100644 index 0000000000..4a5016b717 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorMulticast.java @@ -0,0 +1,86 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; +import rx.observables.ConnectableObservable; +import rx.subjects.Subject; +import rx.subscriptions.Subscriptions; + +/** + * Shares a single subscription to a source through a Subject. + * + * @param the source value type + * @param the result value type + */ +public final class OperatorMulticast extends ConnectableObservable { + final Observable source; + final Subject subject; + final Object guard = new Object(); + /** Guarded by guard. */ + Subscription subscription; + + public OperatorMulticast(Observable source, final Subject subject) { + super(new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + subject.unsafeSubscribe(subscriber); + } + }); + this.source = source; + this.subject = subject; + } + + @Override + public Subscription connect() { + synchronized (guard) { + if (subscription == null) { + subscription = source.unsafeSubscribe(new Subscriber() { + @Override + public void onCompleted() { + subject.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subject.onError(e); + } + + @Override + public void onNext(T args) { + subject.onNext(args); + } + }); + } + } + + return Subscriptions.create(new Action0() { + @Override + public void call() { + synchronized (guard) { + if (subscription != null) { + subscription.unsubscribe(); + subscription = null; + } + } + } + }); + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMulticastSelector.java b/rxjava-core/src/main/java/rx/operators/OperatorMulticastSelector.java new file mode 100644 index 0000000000..401d8a78ef --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorMulticastSelector.java @@ -0,0 +1,76 @@ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.observables.ConnectableObservable; +import rx.observers.SafeSubscriber; +import rx.subjects.Subject; +import rx.subscriptions.CompositeSubscription; + +/** + * Returns an observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @see MSDN: Observable.Multicast + * + * @param the input value type + * @param the intermediate type + * @param the result type + */ +public final class OperatorMulticastSelector implements OnSubscribe { + final Observable source; + final Func0> subjectFactory; + final Func1, ? extends Observable> resultSelector; + + public OperatorMulticastSelector(Observable source, + Func0> subjectFactory, + Func1, ? extends Observable> resultSelector) { + this.source = source; + this.subjectFactory = subjectFactory; + this.resultSelector = resultSelector; + } + + @Override + public void call(Subscriber child) { + Observable observable; + ConnectableObservable connectable; + try { + Subject subject = subjectFactory.call(); + + connectable = new OperatorMulticast(source, subject); + + observable = resultSelector.call(connectable); + } catch (Throwable t) { + child.onError(t); + return; + } + + CompositeSubscription csub = new CompositeSubscription(); + child.add(csub); + + SafeSubscriber s = new SafeSubscriber(child); + + observable.unsafeSubscribe(s); + + csub.add(connectable.connect()); + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationReplay.java b/rxjava-core/src/main/java/rx/operators/OperatorReplay.java similarity index 90% rename from rxjava-core/src/main/java/rx/operators/OperationReplay.java rename to rxjava-core/src/main/java/rx/operators/OperatorReplay.java index e683feb5b2..f07eae0f0c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationReplay.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorReplay.java @@ -27,18 +27,14 @@ import rx.Observable; import rx.Observable.OnSubscribe; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; import rx.Scheduler; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Functions; -import rx.observers.Subscribers; import rx.schedulers.Timestamped; import rx.subjects.Subject; -import rx.subscriptions.Subscriptions; /** * Replay with limited buffer and/or time constraints. @@ -46,14 +42,18 @@ * * @see MSDN: Observable.Replay overloads */ -public final class OperationReplay { +public final class OperatorReplay { /** Utility class. */ - private OperationReplay() { + private OperatorReplay() { throw new IllegalStateException("No instances!"); } /** - * Create a BoundedReplaySubject with the given buffer size. + * Create a CustomReplaySubject with the given buffer size. + * + * @param the element type + * @param bufferSize the maximum number of items to keep in the buffer + * @return the created subject */ public static Subject replayBuffered(int bufferSize) { return CustomReplaySubject.create(bufferSize); @@ -62,6 +62,10 @@ public static Subject replayBuffered(int bufferSize) { /** * Creates a subject whose client observers will observe events * propagated through the given wrapped subject. + * @param the element type + * @param subject the subject to wrap + * @param scheduler the target scheduler + * @return the created subject */ public static Subject createScheduledSubject(Subject subject, Scheduler scheduler) { final Observable observedOn = subject.observeOn(scheduler); @@ -69,8 +73,7 @@ public static Subject createScheduledSubject(Subject subject, Sc @Override public void call(Subscriber o) { - // TODO HACK between OnSubscribeFunc and Action1 - subscriberOf(observedOn).onSubscribe(o); + subscriberOf(observedOn).call(o); } }, subject); @@ -147,55 +150,23 @@ public void call() { /** * Return an OnSubscribeFunc which delegates the subscription to the given observable. + * + * @param the value type + * @param target the target observable + * @return the function that delegates the subscription to the target */ - public static OnSubscribeFunc subscriberOf(final Observable target) { - return new OnSubscribeFunc() { + public static OnSubscribe subscriberOf(final Observable target) { + return new OnSubscribe() { @Override - public Subscription onSubscribe(Observer t1) { - return target.unsafeSubscribe(Subscribers.from(t1)); + public void call(Subscriber t1) { + target.unsafeSubscribe(t1); } }; } -// /** -// * Subject that wraps another subject and uses a mapping function -// * to transform the received values. -// */ -// public static final class MappingSubject extends Subject { -// private final Subject subject; -// private final Func1 selector; -// private final OnSubscribe func; -// -// public MappingSubject(OnSubscribe func, Subject subject, Func1 selector) { -// this.func = func; -// this.subject = subject; -// this.selector = selector; -// } -// -// @Override -// public Observable toObservable() { -// return Observable.create(func); -// } -// -// @Override -// public void onNext(T args) { -// subject.onNext(selector.call(args)); -// } -// -// @Override -// public void onError(Throwable e) { -// subject.onError(e); -// } -// -// @Override -// public void onCompleted() { -// subject.onCompleted(); -// } -// -// } - /** * A subject that wraps another subject. + * @param the value type */ public static final class SubjectWrapper extends Subject { /** The wrapped subject. */ @@ -262,7 +233,7 @@ public interface VirtualList { * Retrieve an element at the specified logical index. * * @param index - * @return + * @return the element */ T get(int index); @@ -286,14 +257,14 @@ public interface VirtualList { /** * Returns the current head index of this list. * - * @return + * @return the head index */ int start(); /** * Returns the current tail index of this list (where the next value would appear). * - * @return + * @return the tail index */ int end(); @@ -312,6 +283,7 @@ public interface VirtualList { /** * Behaves like a normal, unbounded ArrayList but with virtual index. + * @param the element type */ public static final class VirtualArrayList implements VirtualList { /** The backing list . */ @@ -375,6 +347,8 @@ public List toList() { /** * A bounded list which increases its size up to a maximum capacity, then * behaves like a circular buffer with virtual indexes. + * + * @param the element type */ public static final class VirtualBoundedList implements VirtualList { /** A list that grows up to maxSize. */ @@ -584,7 +558,7 @@ Collection replayers() { * @param obs * @return */ - Subscription addReplayer(Observer obs) { + Subscription addReplayer(Subscriber obs) { Subscription s = new Subscription() { final AtomicBoolean once = new AtomicBoolean(); @@ -601,6 +575,7 @@ public boolean isUnsubscribed() { } }; + obs.add(s); Replayer rp = new Replayer(obs, s); replayers.put(s, rp); rp.replayTill(values.start() + values.size()); @@ -609,13 +584,13 @@ public boolean isUnsubscribed() { /** The replayer that holds a value where the given observer is currently at. */ final class Replayer { - protected final Observer wrapped; + protected final Subscriber wrapped; /** Where this replayer was in reading the list. */ protected int index; /** To cancel and unsubscribe this replayer and observer. */ protected final Subscription cancel; - protected Replayer(Observer wrapped, Subscription cancel) { + protected Replayer(Subscriber wrapped, Subscription cancel) { this.wrapped = wrapped; this.cancel = cancel; } @@ -699,9 +674,9 @@ void clearValues() { */ public static final class CustomReplaySubject extends Subject { /** - * Return a subject that retains all events and will replay them to an {@link Observer} that subscribes. - * - * @return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + * Return a subject that retains all events and will replay them to an {@link Subscriber} that subscribes. + * @param the common value type + * @return a subject that retains all events and will replay them to an {@link Subscriber} that subscribes. */ public static CustomReplaySubject create() { ReplayState state = new ReplayState(new VirtualArrayList(), Functions. identity()); @@ -713,9 +688,10 @@ public static CustomReplaySubject create() { /** * Create a bounded replay subject with the given maximum buffer size. * + * @param the common value type * @param maxSize * the maximum size in number of onNext notifications - * @return + * @return the custom replay subject */ public static CustomReplaySubject create(int maxSize) { ReplayState state = new ReplayState(new VirtualBoundedList(maxSize), Functions. identity()); @@ -730,14 +706,14 @@ public static CustomReplaySubject create(int maxSize) { protected final Func1 intermediateSelector; private CustomReplaySubject( - final OnSubscribeFunc onSubscribe, + final OnSubscribe onSubscribe, ReplayState state, Func1 intermediateSelector) { super(new OnSubscribe() { @Override public void call(Subscriber sub) { - onSubscribe.onSubscribe(sub); + onSubscribe.call(sub); } }); @@ -796,7 +772,7 @@ public void onNext(TInput args) { */ protected void replayValues() { int s = state.values.start() + state.values.size(); - for (ReplayState.Replayer rp : state.replayers()) { + for (ReplayState.Replayer rp : state.replayers()) { rp.replayTill(s); } } @@ -811,7 +787,7 @@ protected void replayValues() { * the value type of the observers subscribing to this subject */ protected static final class CustomReplaySubjectSubscribeFunc - implements Observable.OnSubscribeFunc { + implements Observable.OnSubscribe { private final ReplayState state; @@ -820,14 +796,15 @@ protected CustomReplaySubjectSubscribeFunc(ReplayState s } @Override - public Subscription onSubscribe(Observer t1) { + public void call(Subscriber t1) { VirtualList values; Throwable error; state.lock(); try { if (!state.done) { state.onSubscription.call(); - return state.addReplayer(t1); + state.addReplayer(t1); + return; } values = state.values; error = state.error; @@ -840,7 +817,7 @@ public Subscription onSubscribe(Observer t1) { t1.onNext(state.resultSelector.call(values.get(i))); } catch (Throwable t) { t1.onError(t); - return Subscriptions.empty(); + return; } } if (error != null) { @@ -848,7 +825,6 @@ public Subscription onSubscribe(Observer t1) { } else { t1.onCompleted(); } - return Subscriptions.empty(); } } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationMulticastTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMulticastTest.java similarity index 91% rename from rxjava-core/src/test/java/rx/operators/OperationMulticastTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorMulticastTest.java index 5440d1f282..74cfd01c77 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMulticastTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMulticastTest.java @@ -28,13 +28,13 @@ import rx.subjects.PublishSubject; import rx.subjects.Subject; -public class OperationMulticastTest { +public class OperatorMulticastTest { @Test public void testMulticast() { Subject source = PublishSubject.create(); - ConnectableObservable multicasted = OperationMulticast.multicast(source, + ConnectableObservable multicasted = source.multicast( PublishSubject. create()); @SuppressWarnings("unchecked") @@ -62,7 +62,7 @@ public void testMulticast() { public void testMulticastConnectTwice() { Subject source = PublishSubject.create(); - ConnectableObservable multicasted = OperationMulticast.multicast(source, + ConnectableObservable multicasted = source.multicast( PublishSubject. create()); @SuppressWarnings("unchecked") @@ -87,7 +87,7 @@ public void testMulticastConnectTwice() { public void testMulticastDisconnect() { Subject source = PublishSubject.create(); - ConnectableObservable multicasted = OperationMulticast.multicast(source, + ConnectableObservable multicasted = source.multicast( PublishSubject. create()); @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java b/rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java similarity index 95% rename from rxjava-core/src/test/java/rx/operators/OperationReplayTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java index 10ebf8cc5f..96a8645fe9 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorReplayTest.java @@ -33,11 +33,11 @@ import rx.Observer; import rx.functions.Func1; import rx.observables.ConnectableObservable; -import rx.operators.OperationReplay.VirtualBoundedList; +import rx.operators.OperatorReplay.VirtualBoundedList; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; -public class OperationReplayTest { +public class OperatorReplayTest { @Test public void testBoundedList() { VirtualBoundedList list = new VirtualBoundedList(3); @@ -98,6 +98,7 @@ public void testBufferedReplay() { co.connect(); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -121,6 +122,7 @@ public void testBufferedReplay() { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -145,6 +147,7 @@ public void testWindowedReplay() { co.connect(); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -169,6 +172,7 @@ public void testWindowedReplay() { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -206,6 +210,7 @@ public Observable call(Observable t1) { Observable co = source.replay(selector); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -229,6 +234,7 @@ public Observable call(Observable t1) { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -268,6 +274,7 @@ public Observable call(Observable t1) { Observable co = source.replay(selector, 3); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -291,6 +298,7 @@ public Observable call(Observable t1) { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -330,6 +338,7 @@ public Observable call(Observable t1) { Observable co = source.replay(selector, 100, TimeUnit.MILLISECONDS, scheduler); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -354,6 +363,7 @@ public Observable call(Observable t1) { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -373,6 +383,7 @@ public void testBufferedReplayError() { co.connect(); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -397,6 +408,7 @@ public void testBufferedReplayError() { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -421,6 +433,7 @@ public void testWindowedReplayError() { co.connect(); { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1); @@ -445,6 +458,7 @@ public void testWindowedReplayError() { } { + @SuppressWarnings("unchecked") Observer observer1 = mock(Observer.class); InOrder inOrder = inOrder(observer1);