From 21578d5d06523542516b027e6927faa569a33ffd Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 30 Apr 2014 15:16:32 +0200 Subject: [PATCH 1/2] Operator Window and other changes --- rxjava-core/src/main/java/rx/Observable.java | 22 +- .../rx/operators/BufferUntilSubscriber.java | 193 +++-- .../java/rx/operators/ChunkedOperation.java | 675 ------------------ .../java/rx/operators/OperationWindow.java | 527 -------------- .../rx/operators/OperatorBufferWithTime.java | 2 + .../OperatorWindowWithObservable.java | 303 ++++++++ .../rx/operators/OperatorWindowWithSize.java | 165 +++++ .../OperatorWindowWithStartEndObservable.java | 220 ++++++ .../rx/operators/OperatorWindowWithTime.java | 470 ++++++++++++ ...indowTest.java => OperatorWindowTest.java} | 64 +- 10 files changed, 1336 insertions(+), 1305 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/ChunkedOperation.java delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationWindow.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorWindowWithObservable.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorWindowWithSize.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorWindowWithStartEndObservable.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorWindowWithTime.java rename rxjava-core/src/test/java/rx/operators/{OperationWindowTest.java => OperatorWindowTest.java} (88%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 52d9cd5e09..fef1c6dc3e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -7238,7 +7238,7 @@ public final Observable unsubscribeOn(Scheduler scheduler) { * @see RxJava Wiki: window() */ public final Observable> window(Func0> closingSelector) { - return create(OperationWindow.window(this, closingSelector)); + return lift(new OperatorWindowWithObservable(closingSelector)); } /** @@ -7255,7 +7255,7 @@ public final Observable> window(Func0RxJava Wiki: window() */ public final Observable> window(int count) { - return create(OperationWindow.window(this, count)); + return lift(new OperatorWindowWithSize(count, count)); } /** @@ -7275,7 +7275,7 @@ public final Observable> window(int count) { * @see RxJava Wiki: window() */ public final Observable> window(int count, int skip) { - return create(OperationWindow.window(this, count, skip)); + return lift(new OperatorWindowWithSize(count, skip)); } /** @@ -7297,7 +7297,7 @@ public final Observable> window(int count, int skip) { * @see RxJava Wiki: window() */ public final Observable> window(long timespan, long timeshift, TimeUnit unit) { - return create(OperationWindow.window(this, timespan, timeshift, unit)); + return lift(new OperatorWindowWithTime(timespan, timeshift, unit, Integer.MAX_VALUE, Schedulers.computation())); } /** @@ -7321,7 +7321,7 @@ public final Observable> window(long timespan, long timeshift, Tim * @see RxJava Wiki: window() */ public final Observable> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { - return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler)); + return lift(new OperatorWindowWithTime(timespan, timeshift, unit, Integer.MAX_VALUE, scheduler)); } /** @@ -7342,7 +7342,7 @@ public final Observable> window(long timespan, long timeshift, Tim * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit) { - return create(OperationWindow.window(this, timespan, unit)); + return lift(new OperatorWindowWithTime(timespan, timespan, unit, Integer.MAX_VALUE, Schedulers.computation())); } /** @@ -7367,7 +7367,7 @@ public final Observable> window(long timespan, TimeUnit unit) { * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit, int count) { - return create(OperationWindow.window(this, timespan, unit, count)); + return lift(new OperatorWindowWithTime(timespan, timespan, unit, count, Schedulers.computation())); } /** @@ -7394,7 +7394,7 @@ public final Observable> window(long timespan, TimeUnit unit, int * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) { - return create(OperationWindow.window(this, timespan, unit, count, scheduler)); + return lift(new OperatorWindowWithTime(timespan, timespan, unit, count, scheduler)); } /** @@ -7417,7 +7417,7 @@ public final Observable> window(long timespan, TimeUnit unit, int * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit, Scheduler scheduler) { - return create(OperationWindow.window(this, timespan, unit, scheduler)); + return lift(new OperatorWindowWithTime(timespan, timespan, unit, Integer.MAX_VALUE, scheduler)); } /** @@ -7437,7 +7437,7 @@ public final Observable> window(long timespan, TimeUnit unit, Sche * @see RxJava Wiki: window() */ public final Observable> window(Observable windowOpenings, Func1> closingSelector) { - return create(OperationWindow.window(this, windowOpenings, closingSelector)); + return lift(new OperatorWindowWithStartEndObservable(windowOpenings, closingSelector)); } /** @@ -7455,7 +7455,7 @@ public final Observable> window(Observable Observable> window(Observable boundary) { - return create(OperationWindow.window(this, boundary)); + return lift(new OperatorWindowWithObservable(boundary)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java index f5679959d2..7687d8bb04 100644 --- a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -15,12 +15,18 @@ */ package rx.operators; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.functions.Action0; +import rx.observers.Subscribers; +import rx.subscriptions.Subscriptions; /** * A solution to the "time gap" problem that occurs with `groupBy` and `pivot` => https://github.com/Netflix/RxJava/issues/844 @@ -43,44 +49,151 @@ public class BufferUntilSubscriber extends Observable implements Observer { public static BufferUntilSubscriber create() { - return new BufferUntilSubscriber(new AtomicReference>(new BufferedObserver())); + State state = new State(); + return new BufferUntilSubscriber(state); } - private final AtomicReference> observerRef; + /** The common state. */ + static final class State { + /** Lite notifications of type T. */ + final NotificationLite nl = NotificationLite.instance(); + /** The first observer or the one which buffers until the first arrives. */ + final AtomicReference> observerRef = new AtomicReference>(new BufferedObserver()); + /** How many subscribers. */ + final AtomicBoolean first = new AtomicBoolean(); + /** The rest of the subscribers without buffering. Guarded by this. */ + final Set> subscribers = new LinkedHashSet>(); + /** Guarded by this. */ + boolean done; + /** Guarded by this. */ + Throwable exception; + } + + static final class OnSubscribeAction implements OnSubscribe { + final State state; - private BufferUntilSubscriber(final AtomicReference> observerRef) { - super(new OnSubscribe() { + public OnSubscribeAction(State state) { + this.state = state; + } - @Override - public void call(Subscriber s) { + @Override + public void call(final Subscriber s) { + if (state.first.compareAndSet(false, true)) { // drain queued notifications before subscription // we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer - BufferedObserver buffered = (BufferedObserver) observerRef.get(); - Object o = null; + BufferedObserver buffered = (BufferedObserver)state.observerRef.get(); + Object o; while ((o = buffered.buffer.poll()) != null) { - emit(s, o); + state.nl.accept(s, o); } // register real observer for pass-thru ... and drain any further events received on first notification - observerRef.set(new PassThruObserver(s, buffered.buffer, observerRef)); - } + state.observerRef.set(new PassThruObserver(s, buffered.buffer, state.observerRef)); + s.add(Subscriptions.create(new Action0() { + @Override + public void call() { + state.observerRef.set(Subscribers.empty()); + } + })); + } else { + Throwable e = null; + boolean done; + synchronized (state) { + done = state.done; + if (!done) { + state.subscribers.add(s); + } else { + e = state.exception; + } + } + if (done) { + if (e != null) { + s.onError(e); + } else { + s.onCompleted(); + } + return; + } + s.add(Subscriptions.create(new Action0() { - }); - this.observerRef = observerRef; + @Override + public void call() { + synchronized (state) { + state.subscribers.remove(s); + } + } + })); + } + } + + } + final State state; + + private BufferUntilSubscriber(State state) { + super(new OnSubscribeAction(state)); + this.state = state; } @Override public void onCompleted() { - observerRef.get().onCompleted(); + state.observerRef.get().onCompleted(); + // notify the rest + Subscriber[] list; + synchronized (state) { + if (!state.done) { + return; + } + state.done = true; + if (state.subscribers.isEmpty()) { + return; + } + list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]); + state.subscribers.clear(); + } + for (Subscriber s : list) { + s.onCompleted(); + } } @Override public void onError(Throwable e) { - observerRef.get().onError(e); + state.observerRef.get().onError(e); + // notify the rest + Subscriber[] list; + synchronized (state) { + if (!state.done) { + return; + } + state.done = true; + state.exception = e; + if (state.subscribers.isEmpty()) { + return; + } + list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]); + state.subscribers.clear(); + } + for (Subscriber s : list) { + s.onError(e); + } } @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) public void onNext(T t) { - observerRef.get().onNext(t); + state.observerRef.get().onNext(t); + // notify the rest + Subscriber[] list; + synchronized (state) { + if (state.done) { + return; + } + if (state.subscribers.isEmpty()) { + return; + } + list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]); + } + for (Subscriber s : list) { + s.onNext(t); + } } /** @@ -97,6 +210,7 @@ private static class PassThruObserver extends Subscriber { // this assumes single threaded synchronous notifications (the Rx contract for a single Observer) private final ConcurrentLinkedQueue buffer; private final AtomicReference> observerRef; + private final NotificationLite nl = NotificationLite.instance(); PassThruObserver(Observer actual, ConcurrentLinkedQueue buffer, AtomicReference> observerRef) { this.actual = actual; @@ -123,67 +237,34 @@ public void onNext(T t) { } private void drainIfNeededAndSwitchToActual() { - Object o = null; + Object o; while ((o = buffer.poll()) != null) { - emit(this, o); + nl.accept(this, o); } // now we can safely change over to the actual and get rid of the pass-thru - observerRef.set(actual); + observerRef.compareAndSet(this, actual); } } private static class BufferedObserver extends Subscriber { private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue(); + private final NotificationLite nl = NotificationLite.instance(); @Override public void onCompleted() { - buffer.add(COMPLETE_SENTINEL); + buffer.add(nl.completed()); } @Override public void onError(Throwable e) { - buffer.add(new ErrorSentinel(e)); + buffer.add(nl.error(e)); } @Override public void onNext(T t) { - if (t == null) { - buffer.add(NULL_SENTINEL); - } else { - buffer.add(t); - } - } - - } - - private final static void emit(Observer s, Object v) { - if (v instanceof Sentinel) { - if (v == NULL_SENTINEL) { - s.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - s.onCompleted(); - } else if (v instanceof ErrorSentinel) { - s.onError(((ErrorSentinel) v).e); - } - } else { - s.onNext((T) v); + buffer.add(nl.next(t)); } - } - - private static class Sentinel { } - - private static Sentinel NULL_SENTINEL = new Sentinel(); - private static Sentinel COMPLETE_SENTINEL = new Sentinel(); - - private static class ErrorSentinel extends Sentinel { - final Throwable e; - - ErrorSentinel(Throwable e) { - this.e = e; - } - } - } diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java deleted file mode 100644 index e8c5c80f15..0000000000 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ /dev/null @@ -1,675 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import rx.Observable; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.MultipleAssignmentSubscription; - -/** - * The base class for operations that break observables into "chunks". Currently buffers and windows. - */ -public class ChunkedOperation { - /** - * This interface defines a way which specifies when to create a new internal {@link rx.operators.ChunkedOperation.Chunk} object. - * - */ - protected interface ChunkCreator { - /** - * Signifies a onNext event. - */ - void onValuePushed(); - - /** - * Signifies a onCompleted or onError event. Should be used to clean up open - * subscriptions and other still running background tasks. - */ - void stop(); - } - - /** - * This class represents a single chunk: A sequence of recorded values. - * - * @param - * The type of objects which this {@link Chunk} can hold. - * @param - * The type of object being tracked by the {@link Chunk} - */ - protected abstract static class Chunk { - protected final List contents = new ArrayList(); - - /** - * Appends a specified value to the {@link Chunk}. - * - * @param value - * The value to append to the {@link Chunk}. - */ - public void pushValue(T value) { - contents.add(value); - } - - /** - * @return - * The mutable underlying {@code C} which contains all the - * recorded values in this {@link Chunk} object. - */ - abstract public C getContents(); - - /** - * @return - * The size of the underlying {@link List} which contains all the - * recorded values in this {@link Chunk} object. - */ - public int size() { - return contents.size(); - } - } - - /** - * This class is an extension on the {@link rx.operators.ChunkedOperation.Chunks} class which only supports one - * active (not yet emitted) internal {@link rx.operators.ChunkedOperation.Chunks} object. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunks} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class NonOverlappingChunks extends Chunks { - - private final Object lock = new Object(); - - public NonOverlappingChunks(Observer observer, Func0> chunkMaker) { - super(observer, chunkMaker); - } - - public Chunk emitAndReplaceChunk() { - synchronized (lock) { - emitChunk(getChunk()); - return createChunk(); - } - } - - @Override - public void pushValue(T value) { - synchronized (lock) { - super.pushValue(value); - } - } - } - - /** - * This class is an extension on the {@link rx.operators.ChunkedOperation.Chunks} class which actually has no additional - * behavior than its super class. Classes extending this class, are expected to support - * two or more active (not yet emitted) internal {@link rx.operators.ChunkedOperation.Chunks} objects. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunks} objects record. - * The type of object being tracked by the {@link rx.operators.ChunkedOperation.Chunk} - */ - protected static class OverlappingChunks extends Chunks { - public OverlappingChunks(Observer observer, Func0> chunkMaker) { - super(observer, chunkMaker); - } - } - - /** - * This class is an extension on the {@link rx.operators.ChunkedOperation.Chunks} class. Every internal chunk has - * a has a maximum time to live and a maximum internal capacity. When the chunk has - * reached the end of its life, or reached its maximum internal capacity it is - * automatically emitted. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class TimeAndSizeBasedChunks extends Chunks implements Subscription { - - private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); - - private final Scheduler.Worker scheduler; - private final long maxTime; - private final TimeUnit unit; - private final int maxSize; - private volatile boolean unsubscribed = false; - - public TimeAndSizeBasedChunks(Observer observer, Func0> chunkMaker, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) { - super(observer, chunkMaker); - this.maxSize = maxSize; - this.maxTime = maxTime; - this.unit = unit; - this.scheduler = scheduler.createWorker(); - } - - @Override - public Chunk createChunk() { - final Chunk chunk = super.createChunk(); - subscriptions.put(chunk, scheduler.schedule(new Action0() { - @Override - public void call() { - emitChunk(chunk); - } - }, maxTime, unit)); - return chunk; - } - - @Override - public void emitChunk(Chunk chunk) { - Subscription subscription = subscriptions.remove(chunk); - if (subscription == null) { - // Chunk was already emitted. - return; - } - - // Fixed issue 428. - // As unsubscribe will cancel the Future, and the currrent thread's interrupt status - // will be set. So we need to emit the chunk before unsubscribe. - super.emitChunk(chunk); - subscription.unsubscribe(); - createChunk(); - } - - @Override - public void pushValue(T value) { - super.pushValue(value); - - Chunk chunk; - while ((chunk = getChunk()) != null) { - if (chunk.size() >= maxSize) { - emitChunk(chunk); - } else { - // Chunk is not at full capacity yet, and neither will remaining chunks be so we can terminate. - break; - } - } - } - - @Override - public void unsubscribe() { - unsubscribed = true; - for (Subscription s : subscriptions.values()) { - s.unsubscribe(); - } - scheduler.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return unsubscribed; - } - } - - /** - * This class is an extension on the {@link rx.operators.ChunkedOperation.Chunks} class. Every internal chunk has - * a has a maximum time to live. When the chunk has reached the end of its life it is - * automatically emitted. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class TimeBasedChunks extends OverlappingChunks implements Subscription { - - private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); - - private final Scheduler.Worker scheduler; - private final long time; - private final TimeUnit unit; - private volatile boolean unsubscribed = false; - - public TimeBasedChunks(Observer observer, Func0> chunkMaker, long time, TimeUnit unit, Scheduler scheduler) { - super(observer, chunkMaker); - this.time = time; - this.unit = unit; - this.scheduler = scheduler.createWorker(); - } - - @Override - public Chunk createChunk() { - final Chunk chunk = super.createChunk(); - subscriptions.put(chunk, scheduler.schedule(new Action0() { - @Override - public void call() { - emitChunk(chunk); - } - }, time, unit)); - return chunk; - } - - @Override - public void emitChunk(Chunk chunk) { - subscriptions.remove(chunk); - super.emitChunk(chunk); - } - - @Override - public void unsubscribe() { - unsubscribed = true; - for (Subscription s : subscriptions.values()) { - s.unsubscribe(); - } - scheduler.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return unsubscribed; - } - - } - - /** - * This class is an extension on the {@link rx.operators.ChunkedOperation.Chunks} class. Every internal chunk has - * a fixed maximum capacity. When the chunk has reached its maximum capacity it is - * automatically emitted. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class SizeBasedChunks extends Chunks { - - private final int size; - - public SizeBasedChunks(Observer observer, Func0> chunkMaker, int size) { - super(observer, chunkMaker); - this.size = size; - } - - @Override - public void pushValue(T value) { - super.pushValue(value); - - Chunk chunk; - while ((chunk = getChunk()) != null) { - if (chunk.size() >= size) { - emitChunk(chunk); - } else { - // Chunk is not at full capacity yet, and neither will remaining chunks be so we can terminate. - break; - } - } - } - } - - /** - * This class represents an object which contains and manages multiple {@link rx.operators.ChunkedOperation.Chunk} objects. - * - * @param - * The type of objects which the internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class Chunks { - - private final Queue> chunks = new ConcurrentLinkedQueue>(); - private final Observer observer; - private final Func0> chunkMaker; - - /** - * Constructs a new {@link ChunkedOperation.Chunks} object for the specified {@link rx.Observer}. - * - * @param observer - * The {@link rx.Observer} to which this object will emit its internal {@link rx.operators.ChunkedOperation.Chunk} objects to when requested. - */ - public Chunks(Observer observer, Func0> chunkMaker) { - this.observer = observer; - this.chunkMaker = chunkMaker; - } - - /** - * This method will instantiate a new {@link rx.operators.ChunkedOperation.Chunk} object and register it internally. - * - * @return - * The constructed empty {@link rx.operators.ChunkedOperation.Chunk} object. - */ - public Chunk createChunk() { - Chunk chunk = chunkMaker.call(); - chunks.add(chunk); - return chunk; - } - - /** - * This method emits all not yet emitted {@link rx.operators.ChunkedOperation.Chunk} objects. - */ - public void emitAllChunks() { - Chunk chunk; - while ((chunk = chunks.poll()) != null) { - observer.onNext(chunk.getContents()); - } - } - - /** - * This method emits the specified {@link rx.operators.ChunkedOperation.Chunk} object. - * - * @param chunk - * The {@link rx.operators.ChunkedOperation.Chunk} to emit. - */ - public void emitChunk(Chunk chunk) { - if (!chunks.remove(chunk)) { - // Concurrency issue: Chunk is already emitted! - return; - } - observer.onNext(chunk.getContents()); - } - - /** - * @return - * The oldest (in case there are multiple) {@link rx.operators.ChunkedOperation.Chunk} object. - */ - public Chunk getChunk() { - return chunks.peek(); - } - - /** - * This method pushes a value to all not yet emitted {@link rx.operators.ChunkedOperation.Chunk} objects. - * - * @param value - * The value to push to all not yet emitted {@link rx.operators.ChunkedOperation.Chunk} objects. - */ - public void pushValue(T value) { - List> copy = new ArrayList>(chunks); - for (Chunk chunk : copy) { - chunk.pushValue(value); - } - } - } - - /** - * This {@link rx.operators.ChunkedOperation.ChunkObserver} object can be constructed using a {@link rx.operators.ChunkedOperation.Chunks} object, - * a {@link rx.Observer} object, and a {@link rx.operators.ChunkedOperation.ChunkCreator} object. The {@link rx.operators.ChunkedOperation.ChunkCreator} will manage the creation, and in some rare - * cases emission of internal {@link rx.operators.ChunkedOperation.Chunk} objects - * in the specified {@link rx.operators.ChunkedOperation.Chunks} object. Under normal circumstances the {@link rx.operators.ChunkedOperation.Chunks} object specifies when a created - * {@link rx.operators.ChunkedOperation.Chunk} is emitted. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class ChunkObserver extends Subscriber { - - private final Chunks chunks; - private final Observer observer; - private final ChunkCreator creator; - - public ChunkObserver(Chunks chunks, Observer observer, ChunkCreator creator) { - this.observer = observer; - this.creator = creator; - this.chunks = chunks; - } - - @Override - public void onCompleted() { - creator.stop(); - chunks.emitAllChunks(); - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - creator.stop(); - chunks.emitAllChunks(); - observer.onError(e); - } - - @Override - public void onNext(T args) { - creator.onValuePushed(); - chunks.pushValue(args); - } - } - - /** - * This {@link rx.operators.ChunkedOperation.ChunkCreator} creates a new {@link rx.operators.ChunkedOperation.Chunk} when it is initialized, but - * provides no additional functionality. This class should primarily be used when the - * internal {@link rx.operators.ChunkedOperation.Chunk} is closed externally. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class SingleChunkCreator implements ChunkCreator { - - public SingleChunkCreator(Chunks chunks) { - chunks.createChunk(); - } - - @Override - public void onValuePushed() { - // Do nothing. - } - - @Override - public void stop() { - // Do nothing. - } - } - - /** - * This {@link rx.operators.ChunkedOperation.ChunkCreator} creates a new {@link rx.operators.ChunkedOperation.Chunk} whenever it receives an - * object from the provided {@link rx.Observable} created with the - * chunkClosingSelector {@link rx.functions.Func0}. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class ObservableBasedSingleChunkCreator implements ChunkCreator { - - private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - private final Func0> chunkClosingSelector; - private final NonOverlappingChunks chunks; - - public ObservableBasedSingleChunkCreator(NonOverlappingChunks chunks, Func0> chunkClosingSelector) { - this.chunks = chunks; - this.chunkClosingSelector = chunkClosingSelector; - - chunks.createChunk(); - listenForChunkEnd(); - } - - private void listenForChunkEnd() { - Observable closingObservable = chunkClosingSelector.call(); - closingObservable.unsafeSubscribe(new Subscriber() { - - @Override - public void onCompleted() { - - } - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(TClosing t) { - chunks.emitAndReplaceChunk(); - listenForChunkEnd(); - } - - }); - } - - @Override - public void onValuePushed() { - // Ignore value pushes. - } - - @Override - public void stop() { - subscription.unsubscribe(); - } - } - - /** - * This {@link rx.operators.ChunkedOperation.ChunkCreator} creates a new {@link rx.operators.ChunkedOperation.Chunk} whenever it receives - * an object from the provided chunkOpenings {@link rx.Observable}, and closes the corresponding {@link rx.operators.ChunkedOperation.Chunk} object when it receives an object from the provided - * {@link rx.Observable} created - * with the chunkClosingSelector {@link rx.functions.Func1}. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class ObservableBasedMultiChunkCreator implements ChunkCreator { - - private final CompositeSubscription subscription = new CompositeSubscription(); - - public ObservableBasedMultiChunkCreator(final OverlappingChunks chunks, Observable openings, final Func1> chunkClosingSelector) { - openings.unsafeSubscribe(new Subscriber(subscription) { - - @Override - public void onCompleted() { - - } - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(TOpening opening) { - final Chunk chunk = chunks.createChunk(); - Observable closingObservable = chunkClosingSelector.call(opening); - - closingObservable.unsafeSubscribe(new Subscriber() { - - @Override - public void onCompleted() { - - } - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(TClosing t) { - chunks.emitChunk(chunk); - } - - }); - } - - }); - } - - @Override - public void onValuePushed() { - // Ignore value pushes. - } - - @Override - public void stop() { - subscription.unsubscribe(); - } - } - - /** - * This {@link rx.operators.ChunkedOperation.ChunkCreator} creates a new {@link rx.operators.ChunkedOperation.Chunk} every time after a fixed - * period of time has elapsed. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class TimeBasedChunkCreator implements ChunkCreator { - - private final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); - - public TimeBasedChunkCreator(final NonOverlappingChunks chunks, long time, TimeUnit unit, Scheduler scheduler) { - Worker inner = scheduler.createWorker(); - this.subscription.set(inner); - inner.schedulePeriodically(new Action0() { - @Override - public void call() { - chunks.emitAndReplaceChunk(); - } - }, 0, time, unit); - } - - public TimeBasedChunkCreator(final OverlappingChunks chunks, long time, TimeUnit unit, Scheduler scheduler) { - Worker inner = scheduler.createWorker(); - this.subscription.set(inner); - inner.schedulePeriodically(new Action0() { - @Override - public void call() { - chunks.createChunk(); - } - }, 0, time, unit); - } - - @Override - public void onValuePushed() { - // Do nothing: chunks are created periodically. - } - - @Override - public void stop() { - subscription.unsubscribe(); - } - } - - /** - * This {@link rx.operators.ChunkedOperation.ChunkCreator} creates a new {@link rx.operators.ChunkedOperation.Chunk} every time after it has - * seen a certain amount of elements. - * - * @param - * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. - * The type of object being tracked by the {@link Chunk} - */ - protected static class SkippingChunkCreator implements ChunkCreator { - - private final AtomicInteger skipped = new AtomicInteger(1); - private final Chunks chunks; - private final int skip; - - public SkippingChunkCreator(Chunks chunks, int skip) { - this.chunks = chunks; - this.skip = skip; - } - - @Override - public void onValuePushed() { - if (skipped.decrementAndGet() == 0) { - skipped.set(skip); - chunks.createChunk(); - } - } - - @Override - public void stop() { - // Nothing to stop: we're not using a Scheduler. - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java deleted file mode 100644 index 054be1f29b..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ /dev/null @@ -1,527 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.TimeUnit; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; -import rx.subjects.Subject; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; - -public final class OperationWindow extends ChunkedOperation { - - public static Func0> windowMaker() { - return new Func0>() { - @Override - public Window call() { - return new Window(); - } - }; - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window until - * the {@link rx.Observable} constructed using the {@link rx.functions.Func0} argument, produces a value. - * The window is then emitted, and a new window is created to replace it. A new {@link rx.Observable} will - * be constructed using the provided {@link rx.functions.Func0} object, which will determine when this new - * window is emitted. When the source {@link rx.Observable} completes or produces an error, the current - * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param windowClosingSelector - * a {@link rx.functions.Func0} object that produces {@link rx.Observable}s. These - * {@link rx.Observable}s determine when a window is emitted and replaced by simply - * producing an object. - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(final Observable source, final Func0> windowClosingSelector) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - NonOverlappingChunks> windows = new NonOverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(windows, windowClosingSelector); - return source.unsafeSubscribe(new ChunkObserver>(windows, observer, creator)); - } - - }; - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in the currently - * active window. Initially there are no windows active. - *

- * Windows can be created by pushing a {@link rx.util.TOpening} value to the {@code windowOpenings} - * {@link rx.Observable}. This creates a new window which will then start recording values which are - * produced by the {@code source} {@link rx.Observable}. Additionally the {@code windowClosingSelector} - * will be used to construct an {@link rx.Observable} which can produce values. When it does so it will - * close this (and only this) newly created window. When the source {@link rx.Observable} completes or - * produces an error, all windows are emitted, and the event is propagated to all subscribed - * {@link rx.Observer}s. - *

- * Note that when using this operation multiple overlapping windows could be active at any - * one point. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param windowOpenings - * an {@link rx.Observable} which when it produces a {@link rx.util.TOpening} value will create a - * new window which instantly starts recording the {@code source} {@link rx.Observable} - * @param windowClosingSelector - * a {@link rx.functions.Func0} object that produces {@link rx.Observable}s. These - * {@link rx.Observable}s determine when a window is emitted and replaced by simply producing an - * object. - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(final Observable source, final Observable windowOpenings, final Func1> windowClosingSelector) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - OverlappingChunks> windows = new OverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(windows, windowOpenings, windowClosingSelector); - return source.unsafeSubscribe(new ChunkObserver>(windows, observer, creator)); - } - }; - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window until - * the window contains a specified number of elements. The window is then emitted, and a new window is - * created to replace it. When the source {@link rx.Observable} completes or produces an error, the current - * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param count - * the number of elements a window should have before being emitted and replaced - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(Observable source, int count) { - return window(source, count, count); - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in all active - * windows until the window contains a specified number of elements. The window is then emitted. Windows are - * created after a certain amount of values have been received. When the source {@link rx.Observable} - * completes or produces an error, the currently active windows are emitted, and the event is propagated to - * all subscribed {@link rx.Observer}s. - *

- * Note that this operation can produce non-connected, connected non-overlapping, or overlapping - * windows depending on the input parameters. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param count - * the number of elements a window should have before being emitted - * @param skip - * the interval with which windows have to be created. Note that when {@code skip == count} that - * this is the same as calling {@link rx.operators.OperationWindow#window(rx.Observable, int)}. - * If {@code skip < count}, this window operation will produce overlapping windows and if - * {@code skip > count} non-overlapping windows will be created and some values will not be - * pushed into a window at all! - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(final Observable source, final int count, final int skip) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - Chunks> chunks = new SizeBasedChunks>(observer, OperationWindow. windowMaker(), count); - ChunkCreator creator = new SkippingChunkCreator>(chunks, skip); - return source.unsafeSubscribe(new ChunkObserver>(chunks, observer, creator)); - } - }; - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window. - * Periodically the window is emitted and replaced with a new window. How often this is done depends on the - * specified timespan. When the source {@link rx.Observable} completes or produces an error, the current - * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param timespan - * the amount of time all windows must be actively collect values before being emitted - * @param unit - * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(Observable source, long timespan, TimeUnit unit) { - return window(source, timespan, unit, Schedulers.computation()); - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window. - * Periodically the window is emitted and replaced with a new window. How often this is done depends on the - * specified timespan. When the source {@link rx.Observable} completes or produces an error, the current - * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param timespan - * the amount of time all windows must be actively collect values before being emitted - * @param unit - * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan - * @param scheduler - * the {@link rx.Scheduler} to use for timing windows - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(final Observable source, final long timespan, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - NonOverlappingChunks> windows = new NonOverlappingChunks>(observer, OperationWindow. windowMaker()); - ChunkCreator creator = new TimeBasedChunkCreator>(windows, timespan, unit, scheduler); - return source.unsafeSubscribe(new ChunkObserver>(windows, observer, creator)); - } - }; - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window. - * Periodically the window is emitted and replaced with a new window. How often this is done depends on the - * specified timespan. Additionally the window is automatically emitted once it reaches a specified number - * of elements. When the source {@link rx.Observable} completes or produces an error, the current window is - * emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param timespan - * the amount of time all windows must be actively collect values before being emitted - * @param unit - * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan - * @param count - * the maximum size of the window. Once a window reaches this size, it is emitted - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(Observable source, long timespan, TimeUnit unit, int count) { - return window(source, timespan, unit, count, Schedulers.computation()); - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window. - * Periodically the window is emitted and replaced with a new window. How often this is done depends on the - * specified timespan. Additionally the window is automatically emitted once it reaches a specified number - * of elements. When the source {@link rx.Observable} completes or produces an error, the current window is - * emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation only produces non-overlapping windows. At all times there is - * exactly one window actively storing values. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param timespan - * the amount of time all windows must be actively collect values before being emitted - * @param unit - * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan - * @param count - * the maximum size of the window. Once a window reaches this size, it is emitted - * @param scheduler - * the {@link rx.Scheduler} to use for timing windows - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(final Observable source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - Chunks> chunks = new TimeAndSizeBasedChunks>(observer, OperationWindow. windowMaker(), count, timespan, unit, scheduler); - ChunkCreator creator = new SingleChunkCreator>(chunks); - return source.unsafeSubscribe(new ChunkObserver>(chunks, observer, creator)); - } - }; - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window. - * Periodically the window is emitted and replaced with a new window. How often this is done depends on the - * specified timespan. The creation of windows is also periodical. How often this is done depends on the - * specified timeshift. When the source {@link rx.Observable} completes or produces an error, the current - * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation can produce non-connected, or overlapping windows depending on - * the input parameters. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param timespan - * the amount of time all windows must be actively collect values before being emitted - * @param timeshift - * the amount of time between creating windows - * @param unit - * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(Observable source, long timespan, long timeshift, TimeUnit unit) { - return window(source, timespan, timeshift, unit, Schedulers.computation()); - } - - /** - * This method creates a {@link rx.functions.Func1} object which represents the window operation. This - * operation takes values from the specified {@link rx.Observable} source and stores them in a window. - * Periodically the window is emitted and replaced with a new window. How often this is done depends on the - * specified timespan. The creation of windows is also periodical. How often this is done depends on the - * specified timeshift. When the source {@link rx.Observable} completes or produces an error, the current - * window is emitted, and the event is propagated to all subscribed {@link rx.Observer}s. - *

- * Note that this operation can produce non-connected, or overlapping windows depending on - * the input parameters. - *

- * - * @param source - * the {@link rx.Observable} which produces values - * @param timespan - * the amount of time all windows must be actively collect values before being emitted - * @param timeshift - * the amount of time between creating windows - * @param unit - * the {@link java.util.concurrent.TimeUnit} defining the unit of time for the timespan - * @param scheduler - * the {@link rx.Scheduler} to use for timing windows - * @return - * the {@link rx.functions.Func1} object representing the specified window operation - */ - public static OnSubscribeFunc> window(final Observable source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc>() { - @Override - public Subscription onSubscribe(final Observer> observer) { - OverlappingChunks> windows = new TimeBasedChunks>(observer, OperationWindow. windowMaker(), timespan, unit, scheduler); - ChunkCreator creator = new TimeBasedChunkCreator>(windows, timeshift, unit, scheduler); - return source.unsafeSubscribe(new ChunkObserver>(windows, observer, creator)); - } - }; - } - - /** - * This class represents a single window: A sequence of recorded values. - * - * @param - * the type of objects which this {@link Window} can hold - */ - protected static class Window extends Chunk> { - /** - * @return - * the mutable underlying {@link Observable} which contains all the recorded values in this - * {@link Window} object - */ - @Override - public Observable getContents() { - return Observable.from(contents); - } - } - - /** - * Emits windows of values of the source Observable where the window boundary is determined by the items of - * the boundary Observable. - * - * @param source - * @param boundary - * @return - */ - public static OnSubscribeFunc> window(Observable source, Observable boundary) { - return new WindowViaObservable(source, boundary); - } - - /** - * Create non-overlapping windows from the source values by using another observable's values as to when to - * replace a window. - */ - private static final class WindowViaObservable implements OnSubscribeFunc> { - final Observable source; - final Observable boundary; - - public WindowViaObservable(Observable source, Observable boundary) { - this.source = source; - this.boundary = boundary; - } - - @Override - public Subscription onSubscribe(Observer> t1) { - CompositeSubscription csub = new CompositeSubscription(); - - final SourceObserver so = new SourceObserver(t1, csub); - try { - t1.onNext(so.subject); - } catch (Throwable t) { - t1.onError(t); - return Subscriptions.empty(); - } - csub.add(source.unsafeSubscribe(so)); - - if (!csub.isUnsubscribed()) { - csub.add(boundary.unsafeSubscribe(new BoundaryObserver(so))); - } - - return csub; - } - - /** - * Observe the source and emit the values into the current window. - */ - private static final class SourceObserver extends Subscriber { - final Observer> observer; - final Subscription cancel; - final Object guard; - Subject subject; - - public SourceObserver(Observer> observer, Subscription cancel) { - this.observer = observer; - this.cancel = cancel; - this.guard = new Object(); - this.subject = create(); - } - - Subject create() { - return PublishSubject.create(); - } - - @Override - public void onNext(T args) { - synchronized (guard) { - if (subject == null) { - return; - } - subject.onNext(args); - } - } - - @Override - public void onError(Throwable e) { - synchronized (guard) { - if (subject == null) { - return; - } - Subject s = subject; - subject = null; - - s.onError(e); - observer.onError(e); - } - cancel.unsubscribe(); - } - - @Override - public void onCompleted() { - synchronized (guard) { - if (subject == null) { - return; - } - Subject s = subject; - subject = null; - - s.onCompleted(); - observer.onCompleted(); - } - cancel.unsubscribe(); - } - - public void replace() { - try { - synchronized (guard) { - if (subject == null) { - return; - } - Subject s = subject; - s.onCompleted(); - - subject = create(); - observer.onNext(subject); - } - } catch (Throwable t) { - onError(t); - } - } - } - - /** - * Observe the boundary and replace the window on each item. - */ - private static final class BoundaryObserver extends Subscriber { - final SourceObserver so; - - public BoundaryObserver(SourceObserver so) { - this.so = so; - } - - @Override - public void onNext(U args) { - so.replace(); - } - - @Override - public void onError(Throwable e) { - so.onError(e); - } - - @Override - public void onCompleted() { - so.onCompleted(); - } - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java index 98a3efd654..bed97c3c7c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorBufferWithTime.java @@ -253,6 +253,7 @@ public void onError(Throwable e) { chunk = null; } child.onError(e); + unsubscribe(); } @Override @@ -274,6 +275,7 @@ public void onCompleted() { return; } child.onCompleted(); + unsubscribe(); } void scheduleExact() { inner.schedulePeriodically(new Action0() { diff --git a/rxjava-core/src/main/java/rx/operators/OperatorWindowWithObservable.java b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithObservable.java new file mode 100644 index 0000000000..bcf5e2b538 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithObservable.java @@ -0,0 +1,303 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; +import rx.functions.Func0; +import rx.observers.SerializedSubscriber; +import rx.observers.Subscribers; + +/** + * Creates non-overlapping windows of items where each window is terminated by + * an event from a secondary observable and a new window is started immediately. + * + * @param the value type + * @param the boundary value type + */ +public final class OperatorWindowWithObservable implements Operator, T> { + final Func0> otherFactory; + + public OperatorWindowWithObservable(Func0> otherFactory) { + this.otherFactory = otherFactory; + } + public OperatorWindowWithObservable(final Observable other) { + this.otherFactory = new Func0>() { + + @Override + public Observable call() { + return other; + } + + }; + } + + @Override + public Subscriber call(Subscriber> child) { + + Observable other; + try { + other = otherFactory.call(); + } catch (Throwable e) { + child.onError(e); + return Subscribers.empty(); + } + + SourceSubscriber sub = new SourceSubscriber(child); + BoundarySubscriber bs = new BoundarySubscriber(child, sub); + + sub.replaceWindow(); + + other.unsafeSubscribe(bs); + + return sub; + } + /** Indicate the current subject should complete and a new subject be emitted. */ + static final Object NEXT_SUBJECT = new Object(); + /** For error and completion indication. */ + static final NotificationLite nl = NotificationLite.instance(); + /** Observes the source. */ + static final class SourceSubscriber extends Subscriber { + final Subscriber> child; + final Object guard; + /** Accessed from the serialized part. */ + Observer consumer; + /** Accessed from the serialized part. */ + Observable producer; + /** Guarded by guard. */ + boolean emitting; + /** Guarded by guard. */ + List queue; + + public SourceSubscriber(Subscriber> child) { + super(child); + this.child = new SerializedSubscriber>(child); + this.guard = new Object(); + } + @Override + public void onNext(T t) { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(t); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = true; + emitValue(t); + } + + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + return; + } + } + } while (!child.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + + void drain(List queue) { + if (queue == null) { + return; + } + for (Object o : queue) { + if (o == NEXT_SUBJECT) { + replaceSubject(); + } else + if (nl.isError(o)) { + error(nl.getError(o)); + break; + } else + if (nl.isCompleted(o)) { + complete(); + break; + } else { + @SuppressWarnings("unchecked") + T t = (T)o; + emitValue(t); + } + } + } + void replaceSubject() { + Observer s = consumer; + if (s != null) { + s.onCompleted(); + } + createNewWindow(); + child.onNext(producer); + } + void createNewWindow() { + BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + consumer = bus; + producer = bus; + } + void emitValue(T t) { + Observer s = consumer; + if (s != null) { + s.onNext(t); + } + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (emitting) { + queue = Collections.singletonList(nl.error(e)); + return; + } + queue = null; + emitting = true; + } + error(e); + } + + @Override + public void onCompleted() { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(nl.completed()); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + try { + drain(localQueue); + } catch (Throwable e) { + error(e); + return; + } + complete(); + } + void replaceWindow() { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(NEXT_SUBJECT); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = false; + replaceSubject(); + } + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + return; + } + } + } while (!child.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + void complete() { + Observer s = consumer; + consumer = null; + producer = null; + + if (s != null) { + s.onCompleted(); + } + child.onCompleted(); + unsubscribe(); + } + void error(Throwable e) { + Observer s = consumer; + consumer = null; + producer = null; + + if (s != null) { + s.onError(e); + } + child.onError(e); + unsubscribe(); + } + } + /** Observes the boundary. */ + static final class BoundarySubscriber extends Subscriber { + final SourceSubscriber sub; + public BoundarySubscriber(Subscriber child, SourceSubscriber sub) { + super(child); + this.sub = sub; + } + @Override + public void onNext(U t) { + sub.replaceWindow(); + } + + @Override + public void onError(Throwable e) { + sub.onError(e); + } + + @Override + public void onCompleted() { + sub.onCompleted(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorWindowWithSize.java b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithSize.java new file mode 100644 index 0000000000..2f68e9c10f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithSize.java @@ -0,0 +1,165 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; + +/** + * Creates windows of values into the source sequence with skip frequency and size bounds. + * + * If skip == size then the windows are non-overlapping, otherwise, windows may overlap + * or can be discontinuous. The returned Observable sequence is cold and need to be + * consumed while the window operation is in progress. + * + *

Note that this conforms the Rx.NET behavior, but does not match former RxJava + * behavior, which operated as a regular buffer and mapped its lists to Observables.

+ * + * @param the value type + */ +public final class OperatorWindowWithSize implements Operator, T> { + final int size; + final int skip; + + public OperatorWindowWithSize(int size, int skip) { + this.size = size; + this.skip = skip; + } + + @Override + public Subscriber call(Subscriber> child) { + if (skip == size) { + return new ExactSubscriber(child); + } + return new InexactSubscriber(child); + } + /** Subscriber with exact, non-overlapping window bounds. */ + final class ExactSubscriber extends Subscriber { + final Subscriber> child; + int count; + Observer consumer; + Observable producer; + public ExactSubscriber(Subscriber> child) { + super(child); + this.child = child; + } + + @Override + public void onNext(T t) { + if (count++ % size == 0) { + if (consumer != null) { + consumer.onCompleted(); + } + createNewWindow(); + child.onNext(producer); + } + consumer.onNext(t); + } + + @Override + public void onError(Throwable e) { + if (consumer != null) { + consumer.onError(e); + } + child.onError(e); + } + + @Override + public void onCompleted() { + if (consumer != null) { + consumer.onCompleted(); + } + child.onCompleted(); + } + void createNewWindow() { + final BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + consumer = bus; + producer = bus; + } + } + /** Subscriber with inexact, possibly overlapping or skipping windows. */ + final class InexactSubscriber extends Subscriber { + final Subscriber> child; + int count; + final List> chunks; + public InexactSubscriber(Subscriber> child) { + this.child = child; + this.chunks = new LinkedList>(); + } + + @Override + public void onNext(T t) { + if (count++ % skip == 0) { + CountedSubject cs = createCountedSubject(); + chunks.add(cs); + child.onNext(cs.producer); + } + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + CountedSubject cs = it.next(); + cs.consumer.onNext(t); + if (++cs.count == size) { + it.remove(); + cs.consumer.onCompleted(); + } + } + } + + @Override + public void onError(Throwable e) { + List> list = new ArrayList>(chunks); + chunks.clear(); + for (CountedSubject cs : list) { + cs.consumer.onError(e); + } + child.onError(e); + } + + @Override + public void onCompleted() { + List> list = new ArrayList>(chunks); + chunks.clear(); + for (CountedSubject cs : list) { + cs.consumer.onCompleted(); + } + child.onCompleted(); + } + CountedSubject createCountedSubject() { + final BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + return new CountedSubject(bus, bus); + } + } + /** + * Record to store the subject and the emission count. + * @param the subject's in-out type + */ + static final class CountedSubject { + final Observer consumer; + final Observable producer; + int count; + + public CountedSubject(Observer consumer, Observable producer) { + this.consumer = consumer; + this.producer = producer; + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorWindowWithStartEndObservable.java b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithStartEndObservable.java new file mode 100644 index 0000000000..24fb55d805 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithStartEndObservable.java @@ -0,0 +1,220 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; +import rx.functions.Func1; +import rx.observers.SerializedObserver; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * Creates potentially overlapping windows of the source items where each window is + * started by a value emitted by an observable and closed when an associated Observable emits + * a value or completes. + * + * @param the value type + * @param the type of the window opening event + * @param the type of the window closing event + */ +public final class OperatorWindowWithStartEndObservable implements Operator, T> { + final Observable windowOpenings; + final Func1> windowClosingSelector; + + public OperatorWindowWithStartEndObservable(Observable windowOpenings, + Func1> windowClosingSelector) { + this.windowOpenings = windowOpenings; + this.windowClosingSelector = windowClosingSelector; + } + + @Override + public Subscriber call(Subscriber> child) { + final SourceSubscriber sub = new SourceSubscriber(child); + + Subscriber open = new Subscriber(child) { + + @Override + public void onNext(U t) { + sub.beginWindow(t); + } + + @Override + public void onError(Throwable e) { + sub.onError(e); + } + + @Override + public void onCompleted() { + sub.onCompleted(); + } + }; + + windowOpenings.unsafeSubscribe(open); + + return sub; + } + /** Serialized access to the subject. */ + static final class SerializedSubject { + final Observer consumer; + final Observable producer; + + public SerializedSubject(Observer consumer, Observable producer) { + this.consumer = new SerializedObserver(consumer); + this.producer = producer; + } + + } + final class SourceSubscriber extends Subscriber { + final Subscriber> child; + final CompositeSubscription csub; + final Object guard; + /** Guarded by guard. */ + final List> chunks; + /** Guarded by guard. */ + boolean done; + public SourceSubscriber(Subscriber> child) { + super(child); + this.child = new SerializedSubscriber>(child); + this.guard = new Object(); + this.chunks = new LinkedList>(); + this.csub = new CompositeSubscription(); + child.add(csub); + } + @Override + public void onNext(T t) { + List> list; + synchronized (guard) { + if (done) { + return; + } + list = new ArrayList>(chunks); + } + for (SerializedSubject cs : list) { + cs.consumer.onNext(t); + } + } + + @Override + public void onError(Throwable e) { + List> list; + synchronized (guard) { + if (done) { + return; + } + done = true; + list = new ArrayList>(chunks); + chunks.clear(); + } + for (SerializedSubject cs : list) { + cs.consumer.onError(e); + } + child.onError(e); + } + + @Override + public void onCompleted() { + List> list; + synchronized (guard) { + if (done) { + return; + } + done = true; + list = new ArrayList>(chunks); + chunks.clear(); + } + for (SerializedSubject cs : list) { + cs.consumer.onCompleted(); + } + child.onCompleted(); + } + + void beginWindow(U token) { + final SerializedSubject s = createSerializedSubject(); + synchronized (guard) { + if (done) { + return; + } + chunks.add(s); + } + child.onNext(s.producer); + + Observable end; + try { + end = windowClosingSelector.call(token); + } catch (Throwable e) { + onError(e); + return; + } + + Subscriber v = new Subscriber() { + boolean once = true; + @Override + public void onNext(V t) { + onCompleted(); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onCompleted() { + if (once) { + once = false; + endWindow(s); + csub.remove(this); + } + } + + }; + csub.add(v); + + end.unsafeSubscribe(v); + } + void endWindow(SerializedSubject window) { + boolean terminate = false; + synchronized (guard) { + if (done) { + return; + } + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + SerializedSubject s = it.next(); + if (s == window) { + terminate = true; + it.remove(); + break; + } + } + } + if (terminate) { + window.consumer.onCompleted(); + } + } + SerializedSubject createSerializedSubject() { + BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + return new SerializedSubject(bus, bus); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorWindowWithTime.java b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithTime.java new file mode 100644 index 0000000000..528aa65373 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorWindowWithTime.java @@ -0,0 +1,470 @@ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; +import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.Subscriber; +import rx.functions.Action0; +import rx.observers.SerializedObserver; +import rx.observers.SerializedSubscriber; + +/** + * Creates windows of values into the source sequence with timed window creation, length and size bounds. + * If timespan == timeshift, windows are non-overlapping but may not be continuous if size number of items were already + * emitted. If more items arrive after the window has reached its size bound, those items are dropped. + * + *

Note that this conforms the Rx.NET behavior, but does not match former RxJava + * behavior, which operated as a regular buffer and mapped its lists to Observables.

+ * + * @param the value type + */ +public final class OperatorWindowWithTime implements Operator, T> { + /** Length of each window. */ + final long timespan; + /** Period of creating new windows. */ + final long timeshift; + final TimeUnit unit; + final Scheduler scheduler; + final int size; + + public OperatorWindowWithTime(long timespan, long timeshift, TimeUnit unit, int size, Scheduler scheduler) { + this.timespan = timespan; + this.timeshift = timeshift; + this.unit = unit; + this.size = size; + this.scheduler = scheduler; + } + + + @Override + public Subscriber call(Subscriber> child) { + Worker worker = scheduler.createWorker(); + child.add(worker); + + if (timespan == timeshift) { + ExactSubscriber s = new ExactSubscriber(child, worker); + s.scheduleExact(); + return s; + } + + InexactSubscriber s = new InexactSubscriber(child, worker); + s.startNewChunk(); + s.scheduleChunk(); + return s; + } + /** Indicate the current subject should complete and a new subject be emitted. */ + static final Object NEXT_SUBJECT = new Object(); + /** For error and completion indication. */ + static final NotificationLite nl = NotificationLite.instance(); + + /** The immutable windowing state with one subject. */ + static final class State { + final Observer consumer; + final Observable producer; + final int count; + static final State EMPTY = new State(null, null, 0); + + public State(Observer consumer, Observable producer, int count) { + this.consumer = consumer; + this.producer = producer; + this.count = count; + } + public State next() { + return new State(consumer, producer, count + 1); + } + public State create(Observer consumer, Observable producer) { + return new State(consumer, producer, 0); + } + public State clear() { + return empty(); + } + @SuppressWarnings("unchecked") + public static State empty() { + return (State)EMPTY; + } + } + /** Subscriber with exact, non-overlapping windows. */ + final class ExactSubscriber extends Subscriber { + final Subscriber> child; + final Worker worker; + final Object guard; + /** Guarded by guard. */ + List queue; + /** Guarded by guard. */ + boolean emitting; + volatile State state; + + public ExactSubscriber(Subscriber> child, Worker worker) { + super(child); + this.child = new SerializedSubscriber>(child); + this.worker = worker; + this.guard = new Object(); + this.state = State.empty(); + } + @Override + public void onNext(T t) { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(t); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = false; + emitValue(t); + } + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + return; + } + } + } while (!child.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + void drain(List queue) { + if (queue == null) { + return; + } + for (Object o : queue) { + if (o == NEXT_SUBJECT) { + replaceSubject(); + } else + if (nl.isError(o)) { + error(nl.getError(o)); + break; + } else + if (nl.isCompleted(o)) { + complete(); + break; + } else { + @SuppressWarnings("unchecked") + T t = (T)o; + emitValue(t); + } + } + } + void replaceSubject() { + Observer s = state.consumer; + if (s != null) { + s.onCompleted(); + } + BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + state = state.create(bus, bus); + child.onNext(bus); + } + void emitValue(T t) { + State s = state; + + if (s.consumer != null) { + s.consumer.onNext(t); + if (s.count == size) { + s.consumer.onCompleted(); + s = s.clear(); + } else { + s = s.next(); + } + } + + state = s; + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (emitting) { + // drop any queued action and terminate asap + queue = Collections.singletonList(nl.error(e)); + return; + } + queue = null; + emitting = true; + } + error(e); + } + void error(Throwable e) { + Observer s = state.consumer; + state = state.clear(); + if (s != null) { + s.onError(e); + } + child.onError(e); + unsubscribe(); + } + void complete() { + Observer s = state.consumer; + state = state.clear(); + if (s != null) { + s.onCompleted(); + } + child.onCompleted(); + unsubscribe(); + } + @Override + public void onCompleted() { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(nl.completed()); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + try { + drain(localQueue); + } catch (Throwable e) { + error(e); + return; + } + complete(); + } + + void scheduleExact() { + worker.schedulePeriodically(new Action0() { + + @Override + public void call() { + nextWindow(); + } + + }, 0, timespan, unit); + } + void nextWindow() { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(NEXT_SUBJECT); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = false; + replaceSubject(); + } + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + return; + } + } + } while (!child.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + } + /** + * Record to store the subject and the emission count. + * @param the subject's in-out type + */ + static final class CountedSerializedSubject { + final Observer consumer; + final Observable producer; + int count; + + public CountedSerializedSubject(Observer consumer, Observable producer) { + this.consumer = new SerializedObserver(consumer); + this.producer = producer; + } + } + /** Subscriber with inexact, potentially overlapping or discontinuous windows. */ + final class InexactSubscriber extends Subscriber { + final Subscriber> child; + final Worker worker; + final Object guard; + /** Guarded by this. */ + final List> chunks; + /** Guarded by this. */ + boolean done; + public InexactSubscriber(Subscriber> child, Worker worker) { + super(child); + this.child = child; + this.worker = worker; + this.guard = new Object(); + this.chunks = new LinkedList>(); + } + + @Override + public void onNext(T t) { + List> list; + synchronized (guard) { + if (done) { + return; + } + list = new ArrayList>(chunks); + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + CountedSerializedSubject cs = it.next(); + if (++cs.count == size) { + it.remove(); + } + } + } + for (CountedSerializedSubject cs : list) { + cs.consumer.onNext(t); + if (cs.count == size) { + cs.consumer.onCompleted(); + } + } + } + + @Override + public void onError(Throwable e) { + List> list; + synchronized (guard) { + if (done) { + return; + } + done = true; + list = new ArrayList>(chunks); + chunks.clear(); + } + for (CountedSerializedSubject cs : list) { + cs.consumer.onError(e); + } + child.onError(e); + } + + @Override + public void onCompleted() { + List> list; + synchronized (guard) { + if (done) { + return; + } + done = true; + list = new ArrayList>(chunks); + chunks.clear(); + } + for (CountedSerializedSubject cs : list) { + cs.consumer.onCompleted(); + } + child.onCompleted(); + } + void scheduleChunk() { + worker.schedulePeriodically(new Action0() { + + @Override + public void call() { + startNewChunk(); + } + + }, timeshift, timeshift, unit); + } + void startNewChunk() { + final CountedSerializedSubject chunk = createCountedSerializedSubject(); + synchronized (guard) { + if (done) { + return; + } + chunks.add(chunk); + } + try { + child.onNext(chunk.producer); + } catch (Throwable e) { + onError(e); + return; + } + + worker.schedule(new Action0() { + + @Override + public void call() { + terminateChunk(chunk); + } + + }, timespan, unit); + } + void terminateChunk(CountedSerializedSubject chunk) { + boolean terminate = false; + synchronized (guard) { + if (done) { + return; + } + Iterator> it = chunks.iterator(); + while (it.hasNext()) { + CountedSerializedSubject cs = it.next(); + if (cs == chunk) { + terminate = true; + it.remove(); + break; + } + } + } + if (terminate) { + chunk.consumer.onCompleted(); + } + } + CountedSerializedSubject createCountedSerializedSubject() { + BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + return new CountedSerializedSubject(bus, bus); + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperatorWindowTest.java similarity index 88% rename from rxjava-core/src/test/java/rx/operators/OperationWindowTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorWindowTest.java index 38376a06c9..34f20933fb 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorWindowTest.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static rx.operators.OperationWindow.window; import java.util.ArrayList; import java.util.List; @@ -33,7 +32,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; -import rx.Subscription; +import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func0; @@ -41,9 +40,8 @@ import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; -import rx.subscriptions.Subscriptions; -public class OperationWindowTest { +public class OperatorWindowTest { private TestScheduler scheduler; private Scheduler.Worker innerScheduler; @@ -76,7 +74,7 @@ public void call(List xs) { @Test public void testNonOverlappingWindows() { Observable subject = Observable.from("one", "two", "three", "four", "five"); - Observable> windowed = Observable.create(window(subject, 3)); + Observable> windowed = subject.window(3); List> windows = toLists(windowed); @@ -88,7 +86,7 @@ public void testNonOverlappingWindows() { @Test public void testSkipAndCountGaplessWindows() { Observable subject = Observable.from("one", "two", "three", "four", "five"); - Observable> windowed = Observable.create(window(subject, 3, 3)); + Observable> windowed = subject.window(3, 3); List> windows = toLists(windowed); @@ -100,7 +98,7 @@ public void testSkipAndCountGaplessWindows() { @Test public void testOverlappingWindows() { Observable subject = Observable.from(new String[] { "zero", "one", "two", "three", "four", "five" }, Schedulers.trampoline()); - Observable> windowed = Observable.create(window(subject, 3, 1)); + Observable> windowed = subject.window(3, 1); List> windows = toLists(windowed); @@ -116,7 +114,7 @@ public void testOverlappingWindows() { @Test public void testSkipAndCountWindowsWithGaps() { Observable subject = Observable.from("one", "two", "three", "four", "five"); - Observable> windowed = Observable.create(window(subject, 2, 3)); + Observable> windowed = subject.window(2, 3); List> windows = toLists(windowed); @@ -130,20 +128,19 @@ public void testTimedAndCount() { final List list = new ArrayList(); final List> lists = new ArrayList>(); - Observable source = Observable.create(new Observable.OnSubscribeFunc() { + Observable source = Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, "one", 10); push(observer, "two", 90); push(observer, "three", 110); push(observer, "four", 190); push(observer, "five", 210); complete(observer, 250); - return Subscriptions.empty(); } }); - Observable> windowed = Observable.create(window(source, 100, TimeUnit.MILLISECONDS, 2, scheduler)); + Observable> windowed = source.window(100, TimeUnit.MILLISECONDS, 2, scheduler); windowed.subscribe(observeWindow(list, lists)); scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS); @@ -164,20 +161,19 @@ public void testTimed() { final List list = new ArrayList(); final List> lists = new ArrayList>(); - Observable source = Observable.create(new Observable.OnSubscribeFunc() { + Observable source = Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, "one", 98); push(observer, "two", 99); push(observer, "three", 100); push(observer, "four", 101); push(observer, "five", 102); complete(observer, 150); - return Subscriptions.empty(); } }); - Observable> windowed = Observable.create(window(source, 100, TimeUnit.MILLISECONDS, scheduler)); + Observable> windowed = source.window(100, TimeUnit.MILLISECONDS, scheduler); windowed.subscribe(observeWindow(list, lists)); scheduler.advanceTimeTo(101, TimeUnit.MILLISECONDS); @@ -194,44 +190,41 @@ public void testObservableBasedOpenerAndCloser() { final List list = new ArrayList(); final List> lists = new ArrayList>(); - Observable source = Observable.create(new Observable.OnSubscribeFunc() { + Observable source = Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, "one", 10); push(observer, "two", 60); push(observer, "three", 110); push(observer, "four", 160); push(observer, "five", 210); complete(observer, 500); - return Subscriptions.empty(); } }); - Observable openings = Observable.create(new Observable.OnSubscribeFunc() { + Observable openings = Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, new Object(), 50); push(observer, new Object(), 200); complete(observer, 250); - return Subscriptions.empty(); } }); Func1> closer = new Func1>() { @Override public Observable call(Object opening) { - return Observable.create(new Observable.OnSubscribeFunc() { + return Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, new Object(), 100); complete(observer, 101); - return Subscriptions.empty(); } }); } }; - Observable> windowed = Observable.create(window(source, openings, closer)); + Observable> windowed = source.window(openings, closer); windowed.subscribe(observeWindow(list, lists)); scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); @@ -245,34 +238,33 @@ public void testObservableBasedCloser() { final List list = new ArrayList(); final List> lists = new ArrayList>(); - Observable source = Observable.create(new Observable.OnSubscribeFunc() { + Observable source = Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, "one", 10); push(observer, "two", 60); push(observer, "three", 110); push(observer, "four", 160); push(observer, "five", 210); complete(observer, 250); - return Subscriptions.empty(); } }); Func0> closer = new Func0>() { @Override public Observable call() { - return Observable.create(new Observable.OnSubscribeFunc() { + return Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { push(observer, new Object(), 100); - complete(observer, 101); - return Subscriptions.empty(); + push(observer, new Object(), 200); + complete(observer, 301); } }); } }; - Observable> windowed = Observable.create(window(source, closer)); + Observable> windowed = source.window(closer); windowed.subscribe(observeWindow(list, lists)); scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); @@ -502,7 +494,7 @@ public void onCompleted() { } @Test - public void testWindowViaObservableourceThrows() { + public void testWindowViaObservableSourceThrows() { PublishSubject source = PublishSubject.create(); PublishSubject boundary = PublishSubject.create(); @@ -552,4 +544,4 @@ public void onCompleted() { verify(o, never()).onCompleted(); verify(o).onError(any(OperationReduceTest.CustomException.class)); } -} +} \ No newline at end of file From b1effc786b6281c2adfa704228c07a31ea6c64b0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 5 May 2014 07:26:22 +0200 Subject: [PATCH 2/2] Modified BufferUntilSubscriber to onError for any subscriber but the first --- .../rx/operators/BufferUntilSubscriber.java | 86 +------------------ 1 file changed, 3 insertions(+), 83 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java index 7687d8bb04..1392bb6de6 100644 --- a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -15,8 +15,6 @@ */ package rx.operators; -import java.util.LinkedHashSet; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -59,14 +57,8 @@ static final class State { final NotificationLite nl = NotificationLite.instance(); /** The first observer or the one which buffers until the first arrives. */ final AtomicReference> observerRef = new AtomicReference>(new BufferedObserver()); - /** How many subscribers. */ + /** Allow a single subscriber only. */ final AtomicBoolean first = new AtomicBoolean(); - /** The rest of the subscribers without buffering. Guarded by this. */ - final Set> subscribers = new LinkedHashSet>(); - /** Guarded by this. */ - boolean done; - /** Guarded by this. */ - Throwable exception; } static final class OnSubscribeAction implements OnSubscribe { @@ -95,33 +87,7 @@ public void call() { } })); } else { - Throwable e = null; - boolean done; - synchronized (state) { - done = state.done; - if (!done) { - state.subscribers.add(s); - } else { - e = state.exception; - } - } - if (done) { - if (e != null) { - s.onError(e); - } else { - s.onCompleted(); - } - return; - } - s.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - synchronized (state) { - state.subscribers.remove(s); - } - } - })); + s.onError(new IllegalStateException("Only one subscriber allowed!")); } } @@ -136,64 +102,17 @@ private BufferUntilSubscriber(State state) { @Override public void onCompleted() { state.observerRef.get().onCompleted(); - // notify the rest - Subscriber[] list; - synchronized (state) { - if (!state.done) { - return; - } - state.done = true; - if (state.subscribers.isEmpty()) { - return; - } - list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]); - state.subscribers.clear(); - } - for (Subscriber s : list) { - s.onCompleted(); - } } @Override public void onError(Throwable e) { state.observerRef.get().onError(e); - // notify the rest - Subscriber[] list; - synchronized (state) { - if (!state.done) { - return; - } - state.done = true; - state.exception = e; - if (state.subscribers.isEmpty()) { - return; - } - list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]); - state.subscribers.clear(); - } - for (Subscriber s : list) { - s.onError(e); - } } @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public void onNext(T t) { state.observerRef.get().onNext(t); - // notify the rest - Subscriber[] list; - synchronized (state) { - if (state.done) { - return; - } - if (state.subscribers.isEmpty()) { - return; - } - list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]); - } - for (Subscriber s : list) { - s.onNext(t); - } } /** @@ -242,6 +161,7 @@ private void drainIfNeededAndSwitchToActual() { nl.accept(this, o); } // now we can safely change over to the actual and get rid of the pass-thru + // but only if not unsubscribed observerRef.compareAndSet(this, actual); }