From 39d3bd97b2472de77727b00768abc2767aa30624 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 22 Jun 2015 10:18:02 +0200 Subject: [PATCH] Window with boundary observable: fixed unsubscription and behavior. --- src/main/java/rx/Observable.java | 2 +- .../OperatorWindowWithObservable.java | 38 +-- .../OperatorWindowWithObservableFactory.java | 320 ++++++++++++++++++ .../OperatorWindowWithStartEndObservable.java | 83 +++-- .../OperatorWindowWithObservableTest.java | 174 +++++++++- ...ratorWindowWithStartEndObservableTest.java | 96 +++++- 6 files changed, 624 insertions(+), 89 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index ed665b0345..2ba02dd110 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -9060,7 +9060,7 @@ public final Observable withLatestFrom(Observable other, * @see ReactiveX operators documentation: Window */ public final Observable> window(Func0> closingSelector) { - return lift(new OperatorWindowWithObservable(closingSelector)); + return lift(new OperatorWindowWithObservableFactory(closingSelector)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java index c5fec0a13d..3b7e1c1cac 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java @@ -15,16 +15,13 @@ */ package rx.internal.operators; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import rx.Observable; +import java.util.*; + +import rx.*; import rx.Observable.Operator; +import rx.Observable; import rx.Observer; -import rx.Subscriber; -import rx.functions.Func0; import rx.observers.SerializedSubscriber; -import rx.observers.Subscribers; /** * Creates non-overlapping windows of items where each window is terminated by @@ -34,36 +31,21 @@ * @param the boundary value type */ public final class OperatorWindowWithObservable implements Operator, T> { - final Func0> otherFactory; + final Observable other; - public OperatorWindowWithObservable(Func0> otherFactory) { - this.otherFactory = otherFactory; - } public OperatorWindowWithObservable(final Observable other) { - this.otherFactory = new Func0>() { - - @Override - public Observable call() { - return other; - } - - }; + this.other = other; } @Override public Subscriber call(Subscriber> child) { - Observable other; - try { - other = otherFactory.call(); - } catch (Throwable e) { - child.onError(e); - return Subscribers.empty(); - } - SourceSubscriber sub = new SourceSubscriber(child); BoundarySubscriber bs = new BoundarySubscriber(child, sub); + child.add(sub); + child.add(bs); + sub.replaceWindow(); other.unsafeSubscribe(bs); @@ -88,7 +70,6 @@ static final class SourceSubscriber extends Subscriber { List queue; public SourceSubscriber(Subscriber> child) { - super(child); this.child = new SerializedSubscriber>(child); this.guard = new Object(); } @@ -288,7 +269,6 @@ void error(Throwable e) { static final class BoundarySubscriber extends Subscriber { final SourceSubscriber sub; public BoundarySubscriber(Subscriber child, SourceSubscriber sub) { - super(child); this.sub = sub; } diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java new file mode 100644 index 0000000000..a764850c79 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java @@ -0,0 +1,320 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import java.util.*; + +import rx.*; +import rx.Observable.Operator; +import rx.Observable; +import rx.Observer; +import rx.functions.Func0; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.SerialSubscription; + +/** + * Creates non-overlapping windows of items where each window is terminated by + * an event from a secondary observable and a new window is started immediately. + * + * @param the value type + * @param the boundary value type + */ +public final class OperatorWindowWithObservableFactory implements Operator, T> { + final Func0> otherFactory; + + public OperatorWindowWithObservableFactory(Func0> otherFactory) { + this.otherFactory = otherFactory; + } + + @Override + public Subscriber call(Subscriber> child) { + + SourceSubscriber sub = new SourceSubscriber(child, otherFactory); + + child.add(sub); + + sub.replaceWindow(); + + return sub; + } + /** Indicate the current subject should complete and a new subject be emitted. */ + static final Object NEXT_SUBJECT = new Object(); + /** For error and completion indication. */ + static final NotificationLite nl = NotificationLite.instance(); + /** Observes the source. */ + static final class SourceSubscriber extends Subscriber { + final Subscriber> child; + final Object guard; + /** Accessed from the serialized part. */ + Observer consumer; + /** Accessed from the serialized part. */ + Observable producer; + /** Guarded by guard. */ + boolean emitting; + /** Guarded by guard. */ + List queue; + + final SerialSubscription ssub; + + final Func0> otherFactory; + + public SourceSubscriber(Subscriber> child, + Func0> otherFactory) { + this.child = new SerializedSubscriber>(child); + this.guard = new Object(); + this.ssub = new SerialSubscription(); + this.otherFactory = otherFactory; + this.add(ssub); + } + + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(t); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = false; + emitValue(t); + } + + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + return; + } + } + } while (!child.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + + void drain(List queue) { + if (queue == null) { + return; + } + for (Object o : queue) { + if (o == NEXT_SUBJECT) { + replaceSubject(); + } else + if (nl.isError(o)) { + error(nl.getError(o)); + break; + } else + if (nl.isCompleted(o)) { + complete(); + break; + } else { + @SuppressWarnings("unchecked") + T t = (T)o; + emitValue(t); + } + } + } + void replaceSubject() { + Observer s = consumer; + if (s != null) { + s.onCompleted(); + } + createNewWindow(); + child.onNext(producer); + } + void createNewWindow() { + BufferUntilSubscriber bus = BufferUntilSubscriber.create(); + consumer = bus; + producer = bus; + Observable other; + try { + other = otherFactory.call(); + } catch (Throwable e) { + child.onError(e); + unsubscribe(); + return; + } + + BoundarySubscriber bs = new BoundarySubscriber(child, this); + ssub.set(bs); + other.unsafeSubscribe(bs); + } + void emitValue(T t) { + Observer s = consumer; + if (s != null) { + s.onNext(t); + } + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (emitting) { + queue = Collections.singletonList(nl.error(e)); + return; + } + queue = null; + emitting = true; + } + error(e); + } + + @Override + public void onCompleted() { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(nl.completed()); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + try { + drain(localQueue); + } catch (Throwable e) { + error(e); + return; + } + complete(); + } + void replaceWindow() { + List localQueue; + synchronized (guard) { + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(NEXT_SUBJECT); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = false; + replaceSubject(); + } + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + return; + } + } + } while (!child.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + void complete() { + Observer s = consumer; + consumer = null; + producer = null; + + if (s != null) { + s.onCompleted(); + } + child.onCompleted(); + unsubscribe(); + } + void error(Throwable e) { + Observer s = consumer; + consumer = null; + producer = null; + + if (s != null) { + s.onError(e); + } + child.onError(e); + unsubscribe(); + } + } + /** Observes the boundary. */ + static final class BoundarySubscriber extends Subscriber { + final SourceSubscriber sub; + boolean done; + public BoundarySubscriber(Subscriber child, SourceSubscriber sub) { + this.sub = sub; + } + + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onNext(U t) { + if (!done) { + done = true; + sub.replaceWindow(); + } + } + + @Override + public void onError(Throwable e) { + sub.onError(e); + } + + @Override + public void onCompleted() { + if (!done) { + done = true; + sub.onCompleted(); + } + } + } +} diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java index e884cd01d7..82d1474163 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java @@ -15,17 +15,14 @@ */ package rx.internal.operators; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import rx.Observable; +import java.util.*; + +import rx.*; import rx.Observable.Operator; +import rx.Observable; import rx.Observer; -import rx.Subscriber; import rx.functions.Func1; -import rx.observers.SerializedObserver; -import rx.observers.SerializedSubscriber; +import rx.observers.*; import rx.subscriptions.CompositeSubscription; /** @@ -49,9 +46,12 @@ public OperatorWindowWithStartEndObservable(Observable windowOpenin @Override public Subscriber call(Subscriber> child) { - final SourceSubscriber sub = new SourceSubscriber(child); + CompositeSubscription csub = new CompositeSubscription(); + child.add(csub); + + final SourceSubscriber sub = new SourceSubscriber(child, csub); - Subscriber open = new Subscriber(child) { + Subscriber open = new Subscriber() { @Override public void onStart() { @@ -73,7 +73,10 @@ public void onCompleted() { sub.onCompleted(); } }; - + + csub.add(sub); + csub.add(open); + windowOpenings.unsafeSubscribe(open); return sub; @@ -97,13 +100,11 @@ final class SourceSubscriber extends Subscriber { final List> chunks; /** Guarded by guard. */ boolean done; - public SourceSubscriber(Subscriber> child) { - super(child); + public SourceSubscriber(Subscriber> child, CompositeSubscription csub) { this.child = new SerializedSubscriber>(child); this.guard = new Object(); this.chunks = new LinkedList>(); - this.csub = new CompositeSubscription(); - child.add(csub); + this.csub = csub; } @Override @@ -127,36 +128,44 @@ public void onNext(T t) { @Override public void onError(Throwable e) { - List> list; - synchronized (guard) { - if (done) { - return; + try { + List> list; + synchronized (guard) { + if (done) { + return; + } + done = true; + list = new ArrayList>(chunks); + chunks.clear(); } - done = true; - list = new ArrayList>(chunks); - chunks.clear(); - } - for (SerializedSubject cs : list) { - cs.consumer.onError(e); + for (SerializedSubject cs : list) { + cs.consumer.onError(e); + } + child.onError(e); + } finally { + csub.unsubscribe(); } - child.onError(e); } @Override public void onCompleted() { - List> list; - synchronized (guard) { - if (done) { - return; + try { + List> list; + synchronized (guard) { + if (done) { + return; + } + done = true; + list = new ArrayList>(chunks); + chunks.clear(); } - done = true; - list = new ArrayList>(chunks); - chunks.clear(); - } - for (SerializedSubject cs : list) { - cs.consumer.onCompleted(); + for (SerializedSubject cs : list) { + cs.consumer.onCompleted(); + } + child.onCompleted(); + } finally { + csub.unsubscribe(); } - child.onCompleted(); } void beginWindow(U token) { diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java index fd8a20fed3..05488379c2 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java @@ -15,15 +15,12 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -290,4 +287,167 @@ public Observable call() { assertEquals(1, ts.getOnNextEvents().size()); assertEquals(Arrays.asList(1, 2), tsw.getOnNextEvents()); } + + @Test + public void testWindowViaObservableNoUnsubscribe() { + Observable source = Observable.range(1, 10); + Func0> boundary = new Func0>() { + @Override + public Observable call() { + return Observable.empty(); + } + }; + + TestSubscriber> ts = TestSubscriber.create(); + source.window(boundary).unsafeSubscribe(ts); + + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void testBoundaryUnsubscribedOnMainCompletion() { + PublishSubject source = PublishSubject.create(); + final PublishSubject boundary = PublishSubject.create(); + Func0> boundaryFunc = new Func0>() { + @Override + public Observable call() { + return boundary; + } + }; + + TestSubscriber> ts = TestSubscriber.create(); + source.window(boundaryFunc).subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(boundary.hasObservers()); + + source.onCompleted(); + + assertFalse(source.hasObservers()); + assertFalse(boundary.hasObservers()); + + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(1); + } + @Test + public void testMainUnsubscribedOnBoundaryCompletion() { + PublishSubject source = PublishSubject.create(); + final PublishSubject boundary = PublishSubject.create(); + Func0> boundaryFunc = new Func0>() { + @Override + public Observable call() { + return boundary; + } + }; + + TestSubscriber> ts = TestSubscriber.create(); + source.window(boundaryFunc).subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(boundary.hasObservers()); + + boundary.onCompleted(); + + assertFalse(source.hasObservers()); + assertFalse(boundary.hasObservers()); + + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(1); + } + @Test + public void testChildUnsubscribed() { + PublishSubject source = PublishSubject.create(); + final PublishSubject boundary = PublishSubject.create(); + Func0> boundaryFunc = new Func0>() { + @Override + public Observable call() { + return boundary; + } + }; + + TestSubscriber> ts = TestSubscriber.create(); + source.window(boundaryFunc).subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(boundary.hasObservers()); + + ts.unsubscribe(); + + assertFalse(source.hasObservers()); + assertFalse(boundary.hasObservers()); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(1); + } + @Test + public void testNoBackpressure() { + Observable source = Observable.range(1, 10); + final PublishSubject boundary = PublishSubject.create(); + Func0> boundaryFunc = new Func0>() { + @Override + public Observable call() { + return boundary; + } + }; + + final TestSubscriber ts = TestSubscriber.create(1); + final TestSubscriber> ts1 = new TestSubscriber>(1) { + @Override + public void onNext(Observable t) { + super.onNext(t); + t.subscribe(ts); + } + }; + source.window(boundaryFunc) + .subscribe(ts1); + + ts1.assertNoErrors(); + ts1.assertCompleted(); + ts1.assertValueCount(1); + + ts.assertNoErrors(); + ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + ts.assertCompleted(); + } + @Test + public void newBoundaryCalledAfterWindowClosed() { + final AtomicInteger calls = new AtomicInteger(); + PublishSubject source = PublishSubject.create(); + final PublishSubject boundary = PublishSubject.create(); + Func0> boundaryFunc = new Func0>() { + @Override + public Observable call() { + calls.getAndIncrement(); + return boundary; + } + }; + + TestSubscriber> ts = TestSubscriber.create(); + source.window(boundaryFunc).subscribe(ts); + + source.onNext(1); + boundary.onNext(1); + assertTrue(boundary.hasObservers()); + + source.onNext(2); + boundary.onNext(2); + assertTrue(boundary.hasObservers()); + + source.onNext(3); + boundary.onNext(3); + assertTrue(boundary.hasObservers()); + + source.onNext(4); + source.onCompleted(); + + ts.assertNoErrors(); + ts.assertValueCount(4); + ts.assertCompleted(); + + assertFalse(source.hasObservers()); + assertFalse(boundary.hasObservers()); + } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithStartEndObservableTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithStartEndObservableTest.java index 492bf3cb49..be3c16e660 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithStartEndObservableTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithStartEndObservableTest.java @@ -15,25 +15,20 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; +import rx.*; import rx.Observable; import rx.Observer; -import rx.Scheduler; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; +import rx.functions.*; +import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; public class OperatorWindowWithStartEndObservableTest { @@ -112,14 +107,21 @@ public void call(Subscriber observer) { }); Func0> closer = new Func0>() { + int calls; @Override public Observable call() { return Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber observer) { - push(observer, new Object(), 100); - push(observer, new Object(), 200); - complete(observer, 301); + int c = calls++; + if (c == 0) { + push(observer, new Object(), 100); + } else + if (c == 1) { + push(observer, new Object(), 100); + } else { + complete(observer, 101); + } } }); } @@ -185,4 +187,68 @@ public void onNext(String args) { } }; } + + @Test + public void testNoUnsubscribeAndNoLeak() { + PublishSubject source = PublishSubject.create(); + + PublishSubject open = PublishSubject.create(); + final PublishSubject close = PublishSubject.create(); + + TestSubscriber> ts = TestSubscriber.create(); + + source.window(open, new Func1>() { + @Override + public Observable call(Integer t) { + return close; + } + }).unsafeSubscribe(ts); + + open.onNext(1); + source.onNext(1); + + assertTrue(open.hasObservers()); + assertTrue(close.hasObservers()); + + close.onNext(1); + + assertFalse(close.hasObservers()); + + source.onCompleted(); + + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(1); + + assertFalse(ts.isUnsubscribed()); + assertFalse(open.hasObservers()); + assertFalse(close.hasObservers()); + } + + @Test + public void testUnsubscribeAll() { + PublishSubject source = PublishSubject.create(); + + PublishSubject open = PublishSubject.create(); + final PublishSubject close = PublishSubject.create(); + + TestSubscriber> ts = TestSubscriber.create(); + + source.window(open, new Func1>() { + @Override + public Observable call(Integer t) { + return close; + } + }).unsafeSubscribe(ts); + + open.onNext(1); + + assertTrue(open.hasObservers()); + assertTrue(close.hasObservers()); + + ts.unsubscribe(); + + assertFalse(open.hasObservers()); + assertFalse(close.hasObservers()); + } } \ No newline at end of file