From bcc779daa092b250ce8c396da068087740d76d10 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 9 Dec 2013 19:07:16 +0100 Subject: [PATCH] Fix for buffer not stopping when unsubscribed. --- .../java/rx/operators/ChunkedOperation.java | 18 +++++- .../java/rx/operators/OperationBuffer.java | 60 ++++++++++++++++--- .../rx/operators/OperationBufferTest.java | 24 ++++++++ 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java index 1d3b7628912..58f908feef1 100644 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java @@ -148,7 +148,7 @@ public OverlappingChunks(Observer observer, Func0 The type of object being tracked by the {@link Chunk} */ - protected static class TimeAndSizeBasedChunks extends Chunks { + protected static class TimeAndSizeBasedChunks extends Chunks implements Subscription { private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); @@ -207,6 +207,12 @@ public void pushValue(T value) { } } } + @Override + public void unsubscribe() { + for (Subscription s : subscriptions.values()) { + s.unsubscribe(); + } + } } /** @@ -218,7 +224,7 @@ public void pushValue(T value) { * The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record. * The type of object being tracked by the {@link Chunk} */ - protected static class TimeBasedChunks extends OverlappingChunks { + protected static class TimeBasedChunks extends OverlappingChunks implements Subscription { private final ConcurrentMap, Subscription> subscriptions = new ConcurrentHashMap, Subscription>(); @@ -250,6 +256,14 @@ public void emitChunk(Chunk chunk) { subscriptions.remove(chunk); super.emitChunk(chunk); } + + @Override + public void unsubscribe() { + for (Subscription s : subscriptions.values()) { + s.unsubscribe(); + } + } + } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 02d0ce4573c..72cc1df2113 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Observable; import rx.Observable.OnSubscribeFunc; @@ -24,6 +25,7 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; +import rx.subscriptions.CompositeSubscription; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -65,11 +67,14 @@ public static OnSubscribeFunc> buffer(final Observable public Subscription onSubscribe(Observer> observer) { NonOverlappingChunks> buffers = new NonOverlappingChunks>(observer, OperationBuffer. bufferMaker()); ChunkCreator creator = new ObservableBasedSingleChunkCreator, TClosing>(buffers, bufferClosingSelector); - return source.subscribe(new ChunkObserver>(buffers, observer, creator)); + return new CompositeSubscription( + new ChunkToSubscription(creator), + source.subscribe(new ChunkObserver>(buffers, observer, creator)) + ); } }; } - + /** *

This method creates a {@link Func1} object which represents the buffer operation. This operation takes * values from the specified {@link Observable} source and stores them in the currently active chunks. Initially @@ -101,7 +106,10 @@ public static OnSubscribeFunc> buffer(final Obse public Subscription onSubscribe(final Observer> observer) { OverlappingChunks> buffers = new OverlappingChunks>(observer, OperationBuffer. bufferMaker()); ChunkCreator creator = new ObservableBasedMultiChunkCreator, TOpening, TClosing>(buffers, bufferOpenings, bufferClosingSelector); - return source.subscribe(new ChunkObserver>(buffers, observer, creator)); + return new CompositeSubscription( + new ChunkToSubscription(creator), + source.subscribe(new ChunkObserver>(buffers, observer, creator)) + ); } }; } @@ -156,7 +164,10 @@ public static OnSubscribeFunc> buffer(final Observable source, fi public Subscription onSubscribe(final Observer> observer) { Chunks> chunks = new SizeBasedChunks>(observer, OperationBuffer. bufferMaker(), count); ChunkCreator creator = new SkippingChunkCreator>(chunks, skip); - return source.subscribe(new ChunkObserver>(chunks, observer, creator)); + return new CompositeSubscription( + new ChunkToSubscription(creator), + source.subscribe(new ChunkObserver>(chunks, observer, creator)) + ); } }; } @@ -211,7 +222,10 @@ public static OnSubscribeFunc> buffer(final Observable source, fi public Subscription onSubscribe(final Observer> observer) { NonOverlappingChunks> buffers = new NonOverlappingChunks>(observer, OperationBuffer. bufferMaker()); ChunkCreator creator = new TimeBasedChunkCreator>(buffers, timespan, unit, scheduler); - return source.subscribe(new ChunkObserver>(buffers, observer, creator)); + return new CompositeSubscription( + new ChunkToSubscription(creator), + source.subscribe(new ChunkObserver>(buffers, observer, creator)) + ); } }; } @@ -270,9 +284,13 @@ public static OnSubscribeFunc> buffer(final Observable source, fi return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { - Chunks> chunks = new TimeAndSizeBasedChunks>(observer, OperationBuffer. bufferMaker(), count, timespan, unit, scheduler); + TimeAndSizeBasedChunks> chunks = new TimeAndSizeBasedChunks>(observer, OperationBuffer. bufferMaker(), count, timespan, unit, scheduler); ChunkCreator creator = new SingleChunkCreator>(chunks); - return source.subscribe(new ChunkObserver>(chunks, observer, creator)); + return new CompositeSubscription( + chunks, + new ChunkToSubscription(creator), + source.subscribe(new ChunkObserver>(chunks, observer, creator)) + ); } }; } @@ -331,9 +349,13 @@ public static OnSubscribeFunc> buffer(final Observable source, fi return new OnSubscribeFunc>() { @Override public Subscription onSubscribe(final Observer> observer) { - OverlappingChunks> buffers = new TimeBasedChunks>(observer, OperationBuffer. bufferMaker(), timespan, unit, scheduler); + TimeBasedChunks> buffers = new TimeBasedChunks>(observer, OperationBuffer. bufferMaker(), timespan, unit, scheduler); ChunkCreator creator = new TimeBasedChunkCreator>(buffers, timeshift, unit, scheduler); - return source.subscribe(new ChunkObserver>(buffers, observer, creator)); + return new CompositeSubscription( + buffers, + new ChunkToSubscription(creator), + source.subscribe(new ChunkObserver>(buffers, observer, creator)) + ); } }; } @@ -355,4 +377,24 @@ public List getContents() { return contents; } } + + /** + * Converts a chunk creator into a subscription which stops the chunk. + */ + private static class ChunkToSubscription implements Subscription { + private ChunkCreator cc; + private final AtomicBoolean done; + public ChunkToSubscription(ChunkCreator cc) { + this.cc = cc; + this.done = new AtomicBoolean(); + } + @Override + public void unsubscribe() { + if (done.compareAndSet(false, true)) { + ChunkCreator cc0 = cc; + cc = null; + cc0.stop(); + } + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java index 475326a6225..a2de69ed63e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java @@ -19,6 +19,7 @@ import static rx.operators.OperationBuffer.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,6 +28,8 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import rx.Observable; import rx.Observer; @@ -359,4 +362,25 @@ public void call() { } }, delay, TimeUnit.MILLISECONDS); } + + @Test + public void testBufferStopsWhenUnsubscribed1() { + Observable source = Observable.never(); + + Observer> o = mock(Observer.class); + + Subscription s = source.buffer(100, 200, TimeUnit.MILLISECONDS, scheduler).subscribe(o); + + InOrder inOrder = Mockito.inOrder(o); + + scheduler.advanceTimeBy(1001, TimeUnit.MILLISECONDS); + + inOrder.verify(o, times(5)).onNext(Arrays.asList()); + + s.unsubscribe(); + + scheduler.advanceTimeBy(999, TimeUnit.MILLISECONDS); + + inOrder.verifyNoMoreInteractions(); + } }