diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 9999ed9751..895e2ec672 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -75,7 +75,6 @@ trait Observable[+T] import scala.collection.Seq import scala.concurrent.duration.{Duration, TimeUnit} import rx.util.functions._ - import rx.lang.scala.util._ import rx.lang.scala.observables.BlockingObservable import ImplicitFunctionConversions._ import JavaConversions._ @@ -302,45 +301,44 @@ trait Observable[+T] * Creates an Observable which produces buffers of collected values. * * This Observable produces connected non-overlapping buffers. The current buffer is - * emitted and replaced with a new buffer when the Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. The function will then + * emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then * be used to create a new Observable to listen for the end of the next buffer. * * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer + * When this [[rx.lang.scala.Observable]] produces an object, the associated buffer * is emitted and replaced with a new one. * @return * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted - * when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object. + * when the current [[rx.lang.scala.Observable]] created with the function argument produces an object. */ - def buffer(closings: () => Observable[Closing]) : Observable[Seq[T]] = { + def buffer[Closing](closings: () => Observable[_ <: Closing]) : Observable[Seq[T]] = { val f: Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable - val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(f) + val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Closing](f) Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } - /** * Creates an Observable which produces buffers of collected values. * * This Observable produces buffers. Buffers are created when the specified `openings` - * Observable produces a [[rx.lang.scala.util.Opening]] object. Additionally the function argument + * Observable produces an object. Additionally the function argument * is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. When this * Observable produces such an object, the associated buffer is emitted. * * @param openings - * The [[rx.lang.scala.Observable]] which, when it produces a [[rx.lang.scala.util.Opening]] object, will cause + * The [[rx.lang.scala.Observable]] which, when it produces an object, will cause * another buffer to be created. * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer + * When this [[rx.lang.scala.Observable]] produces an object, the associated buffer * is emitted. * @return * An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects. */ - def buffer(openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = { + def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = { val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable - val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable - val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(opening, closing) + val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable + val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing) Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } @@ -512,22 +510,22 @@ trait Observable[+T] /** * Creates an Observable which produces windows of collected values. This Observable produces connected * non-overlapping windows. The current window is emitted and replaced with a new window when the - * Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. + * Observable produced by the specified function produces an object. * The function will then be used to create a new Observable to listen for the end of the next * window. * * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every window created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window + * When this [[rx.lang.scala.Observable]] produces an object, the associated window * is emitted and replaced with a new one. * @return * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted - * when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object. + * when the current [[rx.lang.scala.Observable]] created with the function argument produces an object. */ - def window(closings: () => Observable[Closing]): Observable[Observable[T]] = { + def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = { val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable - val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window(func) - val o2 = toScalaObservable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => { + val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func) + val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => { val x2 = x.asInstanceOf[rx.Observable[_ <: T]] toScalaObservable[T](x2) }) @@ -536,23 +534,23 @@ trait Observable[+T] /** * Creates an Observable which produces windows of collected values. This Observable produces windows. - * Chunks are created when the specified `openings` Observable produces a [[rx.lang.scala.util.Opening]] object. + * Chunks are created when the specified `openings` Observable produces an object. * Additionally the `closings` argument is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. * When this Observable produces such an object, the associated window is emitted. * * @param openings - * The [[rx.lang.scala.Observable]] which when it produces a [[rx.lang.scala.util.Opening]] object, will cause + * The [[rx.lang.scala.Observable]] which when it produces an object, will cause * another window to be created. * @param closings * The function which is used to produce an [[rx.lang.scala.Observable]] for every window created. - * When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window + * When this [[rx.lang.scala.Observable]] produces an object, the associated window * is emitted. * @return * An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects. */ - def window(openings: Observable[Opening], closings: Opening => Observable[Closing]) = { + def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = { Observable.jObsOfJObsToScObsOfScObs( - asJavaObservable.window(openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable)) + asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable)) : Observable[Observable[T]] // SI-7818 } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala deleted file mode 100644 index ed19d849ab..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala - -/** - * Provides [[Opening]]s, [[Closing]]s, and [[Timestamped]]. - */ -package object util { - - /** - * Tagging interface for objects which can open buffers. - * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] - */ - type Opening = rx.util.Opening - - /** - * Creates an object which can open buffers. - * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] - */ - def Opening() = rx.util.Openings.create() - - /** - * Tagging interface for objects which can close buffers. - * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] - */ - type Closing = rx.util.Closing - - /** - * Creates an object which can close buffers. - * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] - */ - def Closing() = rx.util.Closings.create() - - // rx.util.Range not needed because there's a standard Scala Range - -} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9482a5f09f..c682e1bc00 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -111,9 +111,7 @@ import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; -import rx.util.Closing; import rx.util.OnErrorNotImplementedException; -import rx.util.Opening; import rx.util.Range; import rx.util.TimeInterval; import rx.util.Timestamped; @@ -2812,31 +2810,31 @@ public static Observable combineLates return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction)); } - /** +/** * Creates an Observable that produces buffers of collected items. *

* *

* This Observable produces connected, non-overlapping buffers. The current * buffer is emitted and replaced with a new buffer when the Observable - * produced by the specified bufferClosingSelector produces a - * {@link rx.util.Closing} object. The bufferClosingSelector + * produced by the specified bufferClosingSelector produces an + * object. The bufferClosingSelector * will then be used to create a new Observable to listen for the end of * the next buffer. * * @param bufferClosingSelector the {@link Func0} which is used to produce * an {@link Observable} for every buffer * created. When this {@link Observable} - * produces a {@link rx.util.Closing} object, + * produces an object, * the associated buffer is emitted and * replaced with a new one. * @return an {@link Observable} which produces connected, non-overlapping * buffers, which are emitted when the current {@link Observable} - * created with the {@link Func0} argument produces a - * {@link rx.util.Closing} object + * created with the {@link Func0} argument produces an + * object * @see RxJava Wiki: buffer() */ - public Observable> buffer(Func0> bufferClosingSelector) { + public Observable> buffer(Func0> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferClosingSelector)); } @@ -2846,26 +2844,26 @@ public Observable> buffer(Func0> * *

* This Observable produces buffers. Buffers are created when the specified - * bufferOpenings Observable produces a {@link rx.util.Opening} + * bufferOpenings Observable produces an * object. Additionally the bufferClosingSelector argument is - * used to create an Observable which produces {@link rx.util.Closing} + * used to create an Observable which produces * objects. When this Observable produces such an object, the associated * buffer is emitted. * - * @param bufferOpenings the {@link Observable} that, when it produces a - * {@link rx.util.Opening} object, will cause another + * @param bufferOpenings the {@link Observable} that, when it produces an + * object, will cause another * buffer to be created * @param bufferClosingSelector the {@link Func1} that is used to produce * an {@link Observable} for every buffer * created. When this {@link Observable} - * produces a {@link rx.util.Closing} object, + * produces an object, * the associated buffer is emitted. * @return an {@link Observable} that produces buffers that are created and * emitted when the specified {@link Observable}s publish certain * objects * @see RxJava Wiki: buffer() */ - public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { + public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector)); } @@ -3062,8 +3060,8 @@ public Observable> buffer(long timespan, long timeshift, TimeUnit unit, * Creates an Observable that produces windows of collected items. This * Observable produces connected, non-overlapping windows. The current * window is emitted and replaced with a new window when the Observable - * produced by the specified closingSelector produces a - * {@link rx.util.Closing} object. The closingSelector will + * produced by the specified closingSelector produces an + * object. The closingSelector will * then be used to create a new Observable to listen for the end of the next * window. *

@@ -3071,45 +3069,45 @@ public Observable> buffer(long timespan, long timeshift, TimeUnit unit, * * @param closingSelector the {@link Func0} used to produce an * {@link Observable} for every window created. When this - * {@link Observable} emits a {@link rx.util.Closing} object, the + * {@link Observable} emits an object, the * associated window is emitted and replaced with a new one. * @return an {@link Observable} that produces connected, non-overlapping * windows, which are emitted when the current {@link Observable} - * created with the closingSelector argument emits a - * {@link rx.util.Closing} object. + * created with the closingSelector argument emits an + * object. * @see RxJava Wiki: window() */ - public Observable> window(Func0> closingSelector) { + public Observable> window(Func0> closingSelector) { return create(OperationWindow.window(this, closingSelector)); } /** * Creates an Observable that produces windows of collected items. This * Observable produces windows. Chunks are created when the - * windowOpenings Observable produces a {@link rx.util.Opening} + * windowOpenings Observable produces an * object. Additionally the closingSelector argument creates an - * Observable that produces {@link rx.util.Closing} objects. When this + * Observable that produces objects. When this * Observable produces such an object, the associated window is emitted. *

* * - * @param windowOpenings the {@link Observable} that, when it produces a - * {@link rx.util.Opening} object, causes another + * @param windowOpenings the {@link Observable} that, when it produces an + * object, causes another * window to be created * @param closingSelector the {@link Func1} that produces an * {@link Observable} for every window created. When - * this {@link Observable} produces a - * {@link rx.util.Closing} object, the associated + * this {@link Observable} produces an + * object, the associated * window is emitted. * @return an {@link Observable} that produces windows that are created and * emitted when the specified {@link Observable}s publish certain * objects * @see RxJava Wiki: window() */ - public Observable> window(Observable windowOpenings, Func1> closingSelector) { + public Observable> window(Observable windowOpenings, Func1> closingSelector) { return create(OperationWindow.window(this, windowOpenings, closingSelector)); } - + /** * Creates an Observable that produces windows of collected items. This * Observable produces connected, non-overlapping windows, each containing diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java index cabc1309e6..1d3b762891 100644 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java @@ -28,8 +28,6 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.util.Closing; -import rx.util.Opening; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -449,13 +447,13 @@ public void stop() { * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. * The type of object being tracked by the {@link Chunk} */ - protected static class ObservableBasedSingleChunkCreator implements ChunkCreator { + protected static class ObservableBasedSingleChunkCreator implements ChunkCreator { private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - private final Func0> chunkClosingSelector; + private final Func0> chunkClosingSelector; private final NonOverlappingChunks chunks; - public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func0> chunkClosingSelector) { + public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func0> chunkClosingSelector) { this.chunks = chunks; this.chunkClosingSelector = chunkClosingSelector; @@ -464,10 +462,10 @@ public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func } private void listenForChunkEnd() { - Observable closingObservable = chunkClosingSelector.call(); - closingObservable.subscribe(new Action1() { + Observable closingObservable = chunkClosingSelector.call(); + closingObservable.subscribe(new Action1() { @Override - public void call(Closing closing) { + public void call(TClosing closing) { chunks.emitAndReplaceChunk(); listenForChunkEnd(); } @@ -495,20 +493,20 @@ public void stop() { * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. * The type of object being tracked by the {@link Chunk} */ - protected static class ObservableBasedMultiChunkCreator implements ChunkCreator { + protected static class ObservableBasedMultiChunkCreator implements ChunkCreator { private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - public ObservableBasedMultiChunkCreator(final OverlappingChunks chunks, Observable openings, final Func1> chunkClosingSelector) { - subscription.wrap(openings.subscribe(new Action1() { + public ObservableBasedMultiChunkCreator(final OverlappingChunks chunks, Observable openings, final Func1> chunkClosingSelector) { + subscription.wrap(openings.subscribe(new Action1() { @Override - public void call(Opening opening) { + public void call(TOpening opening) { final Chunk chunk = chunks.createChunk(); - Observable closingObservable = chunkClosingSelector.call(opening); + Observable closingObservable = chunkClosingSelector.call(opening); - closingObservable.subscribe(new Action1() { + closingObservable.subscribe(new Action1() { @Override - public void call(Closing closing) { + public void call(TClosing closing) { chunks.emitChunk(chunk); } }); diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index c9aba14411..02d0ce4573 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -24,8 +24,6 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.util.Closing; -import rx.util.Opening; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -42,7 +40,7 @@ public Buffer call() { /** *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes - * values from the specified {@link Observable} source and stores them in a buffer until the {@link Observable} constructed using the {@link Func0} argument, produces a {@link rx.util.Closing} + * values from the specified {@link Observable} source and stores them in a buffer until the {@link Observable} constructed using the {@link Func0} argument, produces a * value. The buffer is then * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using the * provided {@link Func0} object, which will determine when this new buffer is emitted. When the source {@link Observable} completes or produces an error, the current buffer is emitted, and the @@ -56,17 +54,17 @@ public Buffer call() { * The {@link Observable} which produces values. * @param bufferClosingSelector * A {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine when a buffer is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link Func1} object representing the specified buffer operation. */ - public static OnSubscribeFunc> buffer(final Observable source, final Func0> bufferClosingSelector) { + public static OnSubscribeFunc> buffer(final Observable source, final Func0> bufferClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(Observer> observer) { NonOverlappingChunks> buffers = new NonOverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new ObservableBasedSingleChunkCreator>(buffers, bufferClosingSelector); + ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(buffers, bufferClosingSelector); return source.subscribe(new ChunkObserver>(buffers, observer, creator)); } }; @@ -79,7 +77,7 @@ public Subscription onSubscribe(Observer> observer) { * *

Chunks can be created by pushing a {@link rx.util.Opening} value to the "bufferOpenings" {@link Observable}. * This creates a new buffer which will then start recording values which are produced by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an - * {@link Observable} which can produce {@link rx.util.Closing} values. When it does so it will close this (and only this) newly created + * {@link Observable} which can produce values. When it does so it will close this (and only this) newly created * buffer. When the source {@link Observable} completes or produces an error, all chunks are emitted, and the * event is propagated to all subscribed {@link Observer}s.

* @@ -93,16 +91,16 @@ public Subscription onSubscribe(Observer> observer) { * create a new buffer which instantly starts recording the "source" {@link Observable}. * @param bufferClosingSelector * A {@link Func0} object which produces {@link Observable}s. These {@link Observable}s determine when a buffer is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link Func1} object representing the specified buffer operation. */ - public static OnSubscribeFunc> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { + public static OnSubscribeFunc> buffer(final Observable source, final Observable bufferOpenings, final Func1> bufferClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { OverlappingChunks> buffers = new OverlappingChunks>(observer, OperationBuffer. bufferMaker()); - ChunkCreator creator = new ObservableBasedMultiChunkCreator>(buffers, bufferOpenings, bufferClosingSelector); + ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(buffers, bufferOpenings, bufferClosingSelector); return source.subscribe(new ChunkObserver>(buffers, observer, creator)); } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java index d7ff20e061..f18a700bd2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWindow.java @@ -23,8 +23,6 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.util.Closing; -import rx.util.Opening; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -42,7 +40,7 @@ public Window call() { /** *

This method creates a {@link rx.util.functions.Func1} object which represents the window operation. This operation takes * values from the specified {@link rx.Observable} source and stores them in a window until the {@link rx.Observable} constructed using the {@link rx.util.functions.Func0} argument, produces a - * {@link rx.util.Closing} value. The window is then + * value. The window is then * emitted, and a new window is created to replace it. A new {@link rx.Observable} will be constructed using the * provided {@link rx.util.functions.Func0} object, which will determine when this new window is emitted. When the source {@link rx.Observable} completes or produces an error, the current window * is emitted, and the event is propagated @@ -55,16 +53,16 @@ public Window call() { * The {@link rx.Observable} which produces values. * @param windowClosingSelector * A {@link rx.util.functions.Func0} object which produces {@link rx.Observable}s. These {@link rx.Observable}s determine when a window is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link rx.util.functions.Func1} object representing the specified window operation. */ - public static OnSubscribeFunc> window(final Observable source, final Func0> windowClosingSelector) { + public static OnSubscribeFunc> window(final Observable source, final Func0> windowClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { NonOverlappingChunks> windows = new NonOverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new ObservableBasedSingleChunkCreator>(windows, windowClosingSelector); + ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(windows, windowClosingSelector); return source.subscribe(new ChunkObserver>(windows, observer, creator)); } @@ -78,7 +76,7 @@ public Subscription onSubscribe(final Observer> observer) * *

Windows can be created by pushing a {@link rx.util.Opening} value to the "windowOpenings" {@link rx.Observable}. * This creates a new window which will then start recording values which are produced by the "source" {@link rx.Observable}. Additionally the "windowClosingSelector" will be used to construct an - * {@link rx.Observable} which can produce {@link rx.util.Closing} values. When it does so it will close this (and only this) newly created + * {@link rx.Observable} which can produce values. When it does so it will close this (and only this) newly created * window. When the source {@link rx.Observable} completes or produces an error, all windows are emitted, and the * event is propagated to all subscribed {@link rx.Observer}s.

* @@ -92,16 +90,16 @@ public Subscription onSubscribe(final Observer> observer) * create a new window which instantly starts recording the "source" {@link rx.Observable}. * @param windowClosingSelector * A {@link rx.util.functions.Func0} object which produces {@link rx.Observable}s. These {@link rx.Observable}s determine when a window is emitted and replaced by simply - * producing an {@link rx.util.Closing} object. + * producing an object. * @return * the {@link rx.util.functions.Func1} object representing the specified window operation. */ - public static OnSubscribeFunc> window(final Observable source, final Observable windowOpenings, final Func1> windowClosingSelector) { + public static OnSubscribeFunc> window(final Observable source, final Observable windowOpenings, final Func1> windowClosingSelector) { return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { OverlappingChunks> windows = new OverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new ObservableBasedMultiChunkCreator>(windows, windowOpenings, windowClosingSelector); + ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(windows, windowOpenings, windowClosingSelector); return source.subscribe(new ChunkObserver>(windows, observer, creator)); } }; diff --git a/rxjava-core/src/main/java/rx/util/Closing.java b/rxjava-core/src/main/java/rx/util/Closing.java deleted file mode 100644 index 987d7a9dbe..0000000000 --- a/rxjava-core/src/main/java/rx/util/Closing.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.util; - -public interface Closing { - // Tagging interface for objects which can close buffers. -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Closings.java b/rxjava-core/src/main/java/rx/util/Closings.java deleted file mode 100644 index 0de43b97e9..0000000000 --- a/rxjava-core/src/main/java/rx/util/Closings.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.util; - -public class Closings { - - public static Closing create() { - return new Closing() { - }; - } - - private Closings() { - // Prevent instantation. - } -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Opening.java b/rxjava-core/src/main/java/rx/util/Opening.java deleted file mode 100644 index 03a0bbcfb1..0000000000 --- a/rxjava-core/src/main/java/rx/util/Opening.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.util; - -public interface Opening { - // Tagging interface for objects which can open buffers. -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Openings.java b/rxjava-core/src/main/java/rx/util/Openings.java deleted file mode 100644 index 30e11f72ca..0000000000 --- a/rxjava-core/src/main/java/rx/util/Openings.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.util; - -public class Openings { - - public static Opening create() { - return new Opening() { - }; - } - - private Openings() { - // Prevent instantation. - } -} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java index 9318af8fd6..475326a622 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java @@ -33,10 +33,6 @@ import rx.Subscription; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; -import rx.util.Closing; -import rx.util.Closings; -import rx.util.Opening; -import rx.util.Openings; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -226,23 +222,23 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable openings = Observable.create(new Observable.OnSubscribeFunc() { + Observable openings = Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Openings.create(), 50); - push(observer, Openings.create(), 200); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 50); + push(observer, new Object(), 200); complete(observer, 250); return Subscriptions.empty(); } }); - Func1> closer = new Func1>() { + Func1> closer = new Func1>() { @Override - public Observable call(Opening opening) { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call(Object opening) { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); } @@ -277,13 +273,13 @@ public Subscription onSubscribe(Observer observer) { } }); - Func0> closer = new Func0>() { + Func0> closer = new Func0>() { @Override - public Observable call() { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call() { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); } diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index b26cf42cae..6eff582878 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -30,10 +30,6 @@ import rx.Subscription; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; -import rx.util.Closing; -import rx.util.Closings; -import rx.util.Opening; -import rx.util.Openings; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -202,23 +198,23 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable openings = Observable.create(new Observable.OnSubscribeFunc() { + Observable openings = Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Openings.create(), 50); - push(observer, Openings.create(), 200); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 50); + push(observer, new Object(), 200); complete(observer, 250); return Subscriptions.empty(); } }); - Func1> closer = new Func1>() { + Func1> closer = new Func1>() { @Override - public Observable call(Opening opening) { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call(Object opening) { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); } @@ -253,13 +249,13 @@ public Subscription onSubscribe(Observer observer) { } }); - Func0> closer = new Func0>() { + Func0> closer = new Func0>() { @Override - public Observable call() { - return Observable.create(new Observable.OnSubscribeFunc() { + public Observable call() { + return Observable.create(new Observable.OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - push(observer, Closings.create(), 100); + public Subscription onSubscribe(Observer observer) { + push(observer, new Object(), 100); complete(observer, 101); return Subscriptions.empty(); }