diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 565c94221d..c9f03fdf2c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -13,16 +13,19 @@ package io.reactivex.internal.operators.flowable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.ReentrantLock; import org.reactivestreams.*; -import io.reactivex.FlowableSubscriber; -import io.reactivex.disposables.*; +import io.reactivex.*; +import io.reactivex.disposables.Disposable; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; /** * Returns an observable sequence that stays connected to the source as long as @@ -31,199 +34,202 @@ * @param * the value type */ -public final class FlowableRefCount extends AbstractFlowableWithUpstream { +public final class FlowableRefCount extends Flowable { + final ConnectableFlowable source; - volatile CompositeDisposable baseDisposable = new CompositeDisposable(); - final AtomicInteger subscriptionCount = new AtomicInteger(); - - /** - * Use this lock for every subscription and disconnect action. - */ - final ReentrantLock lock = new ReentrantLock(); - - final class ConnectionSubscriber - extends AtomicReference - implements FlowableSubscriber, Subscription { - - private static final long serialVersionUID = 152064694420235350L; - final Subscriber subscriber; - final CompositeDisposable currentBase; - final Disposable resource; - - final AtomicLong requested; - - ConnectionSubscriber(Subscriber subscriber, - CompositeDisposable currentBase, Disposable resource) { - this.subscriber = subscriber; - this.currentBase = currentBase; - this.resource = resource; - this.requested = new AtomicLong(); - } - @Override - public void onSubscribe(Subscription s) { - SubscriptionHelper.deferredSetOnce(this, requested, s); - } + final int n; - @Override - public void onError(Throwable e) { - cleanup(); - subscriber.onError(e); - } + final long timeout; - @Override - public void onNext(T t) { - subscriber.onNext(t); - } + final TimeUnit unit; - @Override - public void onComplete() { - cleanup(); - subscriber.onComplete(); - } - - @Override - public void request(long n) { - SubscriptionHelper.deferredRequest(this, requested, n); - } + final Scheduler scheduler; - @Override - public void cancel() { - SubscriptionHelper.cancel(this); - resource.dispose(); - } + RefConnection connection; - void cleanup() { - // on error or completion we need to dispose the base CompositeDisposable - // and set the subscriptionCount to 0 - lock.lock(); - try { - if (baseDisposable == currentBase) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } - baseDisposable.dispose(); - baseDisposable = new CompositeDisposable(); - subscriptionCount.set(0); - } - } finally { - lock.unlock(); - } - } + public FlowableRefCount(ConnectableFlowable source) { + this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline()); } - /** - * Constructor. - * - * @param source - * observable to apply ref count to - */ - public FlowableRefCount(ConnectableFlowable source) { - super(source); + public FlowableRefCount(ConnectableFlowable source, int n, long timeout, TimeUnit unit, + Scheduler scheduler) { this.source = source; + this.n = n; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; } @Override - public void subscribeActual(final Subscriber subscriber) { - - lock.lock(); - if (subscriptionCount.incrementAndGet() == 1) { - - final AtomicBoolean writeLocked = new AtomicBoolean(true); - - try { - // need to use this overload of connect to ensure that - // baseSubscription is set in the case that source is a - // synchronous Observable - source.connect(onSubscribe(subscriber, writeLocked)); - } finally { - // need to cover the case where the source is subscribed to - // outside of this class thus preventing the Consumer passed - // to source.connect above being called - if (writeLocked.get()) { - // Consumer passed to source.connect was not called - lock.unlock(); - } + protected void subscribeActual(Subscriber s) { + + RefConnection conn; + + boolean connect = false; + synchronized (this) { + conn = connection; + if (conn == null) { + conn = new RefConnection(this); + connection = conn; + } + + long c = conn.subscriberCount; + if (c == 0L && conn.timer != null) { + conn.timer.dispose(); } - } else { - try { - // ready to subscribe to source so do it - doSubscribe(subscriber, baseDisposable); - } finally { - // release the read lock - lock.unlock(); + conn.subscriberCount = c + 1; + if (!conn.connected && c + 1 == n) { + connect = true; + conn.connected = true; } } + source.subscribe(new RefCountSubscriber(s, this, conn)); + + if (connect) { + source.connect(conn); + } } - private Consumer onSubscribe(final Subscriber subscriber, - final AtomicBoolean writeLocked) { - return new DisposeConsumer(subscriber, writeLocked); + void cancel(RefConnection rc) { + SequentialDisposable sd; + synchronized (this) { + if (connection == null) { + return; + } + long c = rc.subscriberCount - 1; + rc.subscriberCount = c; + if (c != 0L || !rc.connected) { + return; + } + if (timeout == 0L) { + timeout(rc); + return; + } + sd = new SequentialDisposable(); + rc.timer = sd; + } + + sd.replace(scheduler.scheduleDirect(rc, timeout, unit)); } - void doSubscribe(final Subscriber subscriber, final CompositeDisposable currentBase) { - // handle disposing from the base subscription - Disposable d = disconnect(currentBase); - ConnectionSubscriber connection = new ConnectionSubscriber(subscriber, currentBase, d); - subscriber.onSubscribe(connection); + void terminated(RefConnection rc) { + synchronized (this) { + if (connection != null) { + connection = null; + if (rc.timer != null) { + rc.timer.dispose(); + } + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + } + } + } - source.subscribe(connection); + void timeout(RefConnection rc) { + synchronized (this) { + if (rc.subscriberCount == 0 && rc == connection) { + connection = null; + DisposableHelper.dispose(rc); + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + } + } } - private Disposable disconnect(final CompositeDisposable current) { - return Disposables.fromRunnable(new DisposeTask(current)); + static final class RefConnection extends AtomicReference + implements Runnable, Consumer { + + private static final long serialVersionUID = -4552101107598366241L; + + final FlowableRefCount parent; + + Disposable timer; + + long subscriberCount; + + boolean connected; + + RefConnection(FlowableRefCount parent) { + this.parent = parent; + } + + @Override + public void run() { + parent.timeout(this); + } + + @Override + public void accept(Disposable t) throws Exception { + DisposableHelper.replace(this, t); + } } - final class DisposeConsumer implements Consumer { - private final Subscriber subscriber; - private final AtomicBoolean writeLocked; + static final class RefCountSubscriber + extends AtomicBoolean implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -7419642935409022375L; + + final Subscriber actual; + + final FlowableRefCount parent; + + final RefConnection connection; - DisposeConsumer(Subscriber subscriber, AtomicBoolean writeLocked) { - this.subscriber = subscriber; - this.writeLocked = writeLocked; + Subscription upstream; + + RefCountSubscriber(Subscriber actual, FlowableRefCount parent, RefConnection connection) { + this.actual = actual; + this.parent = parent; + this.connection = connection; + } + + @Override + public void onNext(T t) { + actual.onNext(t); } @Override - public void accept(Disposable subscription) { - try { - baseDisposable.add(subscription); - // ready to subscribe to source so do it - doSubscribe(subscriber, baseDisposable); - } finally { - // release the write lock - lock.unlock(); - writeLocked.set(false); + public void onError(Throwable t) { + if (compareAndSet(false, true)) { + parent.terminated(connection); + actual.onError(t); + } else { + RxJavaPlugins.onError(t); } } - } - final class DisposeTask implements Runnable { - private final CompositeDisposable current; + @Override + public void onComplete() { + if (compareAndSet(false, true)) { + parent.terminated(connection); + actual.onComplete(); + } + } - DisposeTask(CompositeDisposable current) { - this.current = current; + @Override + public void request(long n) { + upstream.request(n); } @Override - public void run() { - lock.lock(); - try { - if (baseDisposable == current) { - if (subscriptionCount.decrementAndGet() == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } - - baseDisposable.dispose(); - // need a new baseDisposable because once - // disposed stays that way - baseDisposable = new CompositeDisposable(); - } - } - } finally { - lock.unlock(); + public void cancel() { + upstream.cancel(); + if (compareAndSet(false, true)) { + parent.cancel(connection); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + this.upstream = s; + + actual.onSubscribe(this); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 58a3141b4c..fd62c1ae60 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -13,14 +13,16 @@ package io.reactivex.internal.operators.observable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.ReentrantLock; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.*; import io.reactivex.observables.ConnectableObservable; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; /** * Returns an observable sequence that stays connected to the source as long as @@ -29,201 +31,201 @@ * @param * the value type */ -public final class ObservableRefCount extends AbstractObservableWithUpstream { +public final class ObservableRefCount extends Observable { - final ConnectableObservable source; + final ConnectableObservable source; - volatile CompositeDisposable baseDisposable = new CompositeDisposable(); + final int n; - final AtomicInteger subscriptionCount = new AtomicInteger(); + final long timeout; - /** - * Use this lock for every subscription and disconnect action. - */ - final ReentrantLock lock = new ReentrantLock(); + final TimeUnit unit; + + final Scheduler scheduler; + + RefConnection connection; - /** - * Constructor. - * - * @param source - * observable to apply ref count to - */ public ObservableRefCount(ConnectableObservable source) { - super(source); + this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline()); + } + + public ObservableRefCount(ConnectableObservable source, int n, long timeout, TimeUnit unit, + Scheduler scheduler) { this.source = source; + this.n = n; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; } @Override - public void subscribeActual(final Observer subscriber) { - - lock.lock(); - if (subscriptionCount.incrementAndGet() == 1) { - - final AtomicBoolean writeLocked = new AtomicBoolean(true); - - try { - // need to use this overload of connect to ensure that - // baseDisposable is set in the case that source is a - // synchronous Observable - source.connect(onSubscribe(subscriber, writeLocked)); - } finally { - // need to cover the case where the source is subscribed to - // outside of this class thus preventing the Consumer passed - // to source.connect above being called - if (writeLocked.get()) { - // Consumer passed to source.connect was not called - lock.unlock(); - } + protected void subscribeActual(Observer s) { + + RefConnection conn; + + boolean connect = false; + synchronized (this) { + conn = connection; + if (conn == null) { + conn = new RefConnection(this); + connection = conn; } - } else { - try { - // ready to subscribe to source so do it - doSubscribe(subscriber, baseDisposable); - } finally { - // release the read lock - lock.unlock(); + + long c = conn.subscriberCount; + if (c == 0L && conn.timer != null) { + conn.timer.dispose(); + } + conn.subscriberCount = c + 1; + if (!conn.connected && c + 1 == n) { + connect = true; + conn.connected = true; } } - } + source.subscribe(new RefCountObserver(s, this, conn)); - private Consumer onSubscribe(final Observer observer, - final AtomicBoolean writeLocked) { - return new DisposeConsumer(observer, writeLocked); + if (connect) { + source.connect(conn); + } } - void doSubscribe(final Observer observer, final CompositeDisposable currentBase) { - // handle disposing from the base CompositeDisposable - Disposable d = disconnect(currentBase); + void cancel(RefConnection rc) { + SequentialDisposable sd; + synchronized (this) { + if (connection == null) { + return; + } + long c = rc.subscriberCount - 1; + rc.subscriberCount = c; + if (c != 0L || !rc.connected) { + return; + } + if (timeout == 0L) { + timeout(rc); + return; + } + sd = new SequentialDisposable(); + rc.timer = sd; + } - ConnectionObserver s = new ConnectionObserver(observer, currentBase, d); - observer.onSubscribe(s); + sd.replace(scheduler.scheduleDirect(rc, timeout, unit)); + } - source.subscribe(s); + void terminated(RefConnection rc) { + synchronized (this) { + if (connection != null) { + connection = null; + if (rc.timer != null) { + rc.timer.dispose(); + } + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + } + } } - private Disposable disconnect(final CompositeDisposable current) { - return Disposables.fromRunnable(new DisposeTask(current)); + void timeout(RefConnection rc) { + synchronized (this) { + if (rc.subscriberCount == 0 && rc == connection) { + connection = null; + DisposableHelper.dispose(rc); + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + } + } } - final class ConnectionObserver - extends AtomicReference - implements Observer, Disposable { + static final class RefConnection extends AtomicReference + implements Runnable, Consumer { - private static final long serialVersionUID = 3813126992133394324L; + private static final long serialVersionUID = -4552101107598366241L; - final Observer subscriber; - final CompositeDisposable currentBase; - final Disposable resource; + final ObservableRefCount parent; - ConnectionObserver(Observer subscriber, - CompositeDisposable currentBase, Disposable resource) { - this.subscriber = subscriber; - this.currentBase = currentBase; - this.resource = resource; - } + Disposable timer; - @Override - public void onSubscribe(Disposable s) { - DisposableHelper.setOnce(this, s); - } + long subscriberCount; - @Override - public void onError(Throwable e) { - cleanup(); - subscriber.onError(e); + boolean connected; + + RefConnection(ObservableRefCount parent) { + this.parent = parent; } @Override - public void onNext(T t) { - subscriber.onNext(t); + public void run() { + parent.timeout(this); } @Override - public void onComplete() { - cleanup(); - subscriber.onComplete(); + public void accept(Disposable t) throws Exception { + DisposableHelper.replace(this, t); } + } - @Override - public void dispose() { - DisposableHelper.dispose(this); - resource.dispose(); + static final class RefCountObserver + extends AtomicBoolean implements Observer, Disposable { + + private static final long serialVersionUID = -7419642935409022375L; + + final Observer actual; + + final ObservableRefCount parent; + + final RefConnection connection; + + Disposable upstream; + + RefCountObserver(Observer actual, ObservableRefCount parent, RefConnection connection) { + this.actual = actual; + this.parent = parent; + this.connection = connection; } @Override - public boolean isDisposed() { - return DisposableHelper.isDisposed(get()); + public void onNext(T t) { + actual.onNext(t); } - void cleanup() { - // on error or completion we need to dispose the base CompositeDisposable - // and set the subscriptionCount to 0 - lock.lock(); - try { - if (baseDisposable == currentBase) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } - - baseDisposable.dispose(); - baseDisposable = new CompositeDisposable(); - subscriptionCount.set(0); - } - } finally { - lock.unlock(); + @Override + public void onError(Throwable t) { + if (compareAndSet(false, true)) { + parent.terminated(connection); + actual.onError(t); + } else { + RxJavaPlugins.onError(t); } } - } - final class DisposeConsumer implements Consumer { - private final Observer observer; - private final AtomicBoolean writeLocked; - - DisposeConsumer(Observer observer, AtomicBoolean writeLocked) { - this.observer = observer; - this.writeLocked = writeLocked; + @Override + public void onComplete() { + if (compareAndSet(false, true)) { + parent.terminated(connection); + actual.onComplete(); + } } @Override - public void accept(Disposable subscription) { - try { - baseDisposable.add(subscription); - // ready to subscribe to source so do it - doSubscribe(observer, baseDisposable); - } finally { - // release the write lock - lock.unlock(); - writeLocked.set(false); + public void dispose() { + upstream.dispose(); + if (compareAndSet(false, true)) { + parent.cancel(connection); } } - } - - final class DisposeTask implements Runnable { - private final CompositeDisposable current; - DisposeTask(CompositeDisposable current) { - this.current = current; + @Override + public boolean isDisposed() { + return upstream.isDisposed(); } @Override - public void run() { - lock.lock(); - try { - if (baseDisposable == current) { - if (subscriptionCount.decrementAndGet() == 0) { - if (source instanceof Disposable) { - ((Disposable)source).dispose(); - } - - baseDisposable.dispose(); - // need a new baseDisposable because once - // disposed stays that way - baseDisposable = new CompositeDisposable(); - } - } - } finally { - lock.unlock(); + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(upstream, d)) { + this.upstream = d; + + actual.onSubscribe(this); } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index eafd7873f8..01a6d9396e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; @@ -28,13 +29,15 @@ import io.reactivex.*; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableRefCount.RefConnection; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.internal.util.ExceptionHelper; -import io.reactivex.processors.ReplayProcessor; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.*; import io.reactivex.schedulers.*; import io.reactivex.subscribers.TestSubscriber; @@ -829,6 +832,7 @@ public void connect(Consumer connection) { @Override protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); } } @@ -962,4 +966,329 @@ public void badSourceCompleteDisconnect() { assertTrue(ex.getCause() instanceof TestException); } } + + @Test(timeout = 7500) + public void blockingSourceAsnycCancel() throws Exception { + BehaviorProcessor bp = BehaviorProcessor.createDefault(1); + + Flowable f = bp + .replay(1) + .refCount(); + + f.subscribe(); + + final AtomicBoolean interrupted = new AtomicBoolean(); + + f.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter emitter) throws Exception { + while (!emitter.isCancelled()) { + Thread.sleep(100); + } + interrupted.set(true); + } + }, BackpressureStrategy.MISSING); + } + }) + .takeUntil(Flowable.timer(500, TimeUnit.MILLISECONDS)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + + assertTrue(interrupted.get()); + } + + static FlowableTransformer refCount(final int n) { + return refCount(n, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline()); + } + + static FlowableTransformer refCount(final long time, final TimeUnit unit) { + return refCount(1, time, unit, Schedulers.computation()); + } + + static FlowableTransformer refCount(final long time, final TimeUnit unit, final Scheduler scheduler) { + return refCount(1, time, unit, scheduler); + } + + static FlowableTransformer refCount(final int n, final long time, final TimeUnit unit) { + return refCount(1, time, unit, Schedulers.computation()); + } + + static FlowableTransformer refCount(final int n, final long time, final TimeUnit unit, final Scheduler scheduler) { + return new FlowableTransformer() { + @Override + public Publisher apply(Flowable f) { + return new FlowableRefCount((ConnectableFlowable)f, n, time, unit, scheduler); + } + }; + } + + @Test + public void byCount() { + final int[] subscriptions = { 0 }; + + Flowable source = Flowable.range(1, 5) + .doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + subscriptions[0]++; + } + }) + .publish() + .compose(FlowableRefCountTest.refCount(2)); + + for (int i = 0; i < 3; i++) { + TestSubscriber ts1 = source.test(); + + ts1.assertEmpty(); + + TestSubscriber ts2 = source.test(); + + ts1.assertResult(1, 2, 3, 4, 5); + ts2.assertResult(1, 2, 3, 4, 5); + } + + assertEquals(3, subscriptions[0]); + } + + @Test + public void resubscribeBeforeTimeout() throws Exception { + final int[] subscriptions = { 0 }; + + PublishProcessor pp = PublishProcessor.create(); + + Flowable source = pp + .doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + subscriptions[0]++; + } + }) + .publish() + .compose(FlowableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)); + + TestSubscriber ts1 = source.test(0); + + assertEquals(1, subscriptions[0]); + + ts1.cancel(); + + Thread.sleep(100); + + ts1 = source.test(0); + + assertEquals(1, subscriptions[0]); + + Thread.sleep(500); + + assertEquals(1, subscriptions[0]); + + pp.onNext(1); + pp.onNext(2); + pp.onNext(3); + pp.onNext(4); + pp.onNext(5); + pp.onComplete(); + + ts1.requestMore(5) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void letitTimeout() throws Exception { + final int[] subscriptions = { 0 }; + + PublishProcessor pp = PublishProcessor.create(); + + Flowable source = pp + .doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + subscriptions[0]++; + } + }) + .publish() + .compose(FlowableRefCountTest.refCount(1, 100, TimeUnit.MILLISECONDS)); + + TestSubscriber ts1 = source.test(0); + + assertEquals(1, subscriptions[0]); + + ts1.cancel(); + + assertTrue(pp.hasSubscribers()); + + Thread.sleep(200); + + assertFalse(pp.hasSubscribers()); + } + + @Test + public void error() { + Flowable.error(new IOException()) + .publish() + .compose(FlowableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)) + .test() + .assertFailure(IOException.class); + } + + @Test(expected = ClassCastException.class) + public void badUpstream() { + Flowable.range(1, 5) + .compose(FlowableRefCountTest.refCount(500, TimeUnit.MILLISECONDS, Schedulers.single())) + ; + } + + @Test + public void comeAndGo() { + PublishProcessor pp = PublishProcessor.create(); + + Flowable source = pp + .publish() + .compose(FlowableRefCountTest.refCount(1)); + + TestSubscriber ts1 = source.test(0); + + assertTrue(pp.hasSubscribers()); + + for (int i = 0; i < 3; i++) { + TestSubscriber ts2 = source.test(); + ts1.cancel(); + ts1 = ts2; + } + + ts1.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @Test + public void unsubscribeSubscribeRace() { + for (int i = 0; i < 1000; i++) { + + final Flowable source = Flowable.range(1, 5) + .replay() + .compose(FlowableRefCountTest.refCount(1)) + ; + + final TestSubscriber ts1 = source.test(0); + + final TestSubscriber ts2 = new TestSubscriber(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + source.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + ts2.requestMore(6) // FIXME RxJava replay() doesn't issue onComplete without request + .withTag("Round: " + i) + .assertResult(1, 2, 3, 4, 5); + } + } + + static final class BadFlowableDoubleOnX extends ConnectableFlowable + implements Disposable { + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onSubscribe(new BooleanSubscription()); + observer.onComplete(); + observer.onComplete(); + observer.onError(new TestException()); + } + + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + } + + @Test + public void doubleOnX() { + List errors = TestHelper.trackPluginErrors(); + try { + new BadFlowableDoubleOnX() + .refCount() + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void cancelTerminateStateExclusion() { + FlowableRefCount o = (FlowableRefCount)PublishProcessor.create() + .publish() + .refCount(); + + o.cancel(null); + + RefConnection rc = new RefConnection(o); + o.connection = null; + rc.subscriberCount = 0; + o.timeout(rc); + + rc.subscriberCount = 1; + o.timeout(rc); + + o.connection = rc; + o.timeout(rc); + + rc.subscriberCount = 0; + o.timeout(rc); + + // ------------------- + + rc.subscriberCount = 2; + rc.connected = false; + o.connection = rc; + o.cancel(rc); + + rc.subscriberCount = 1; + rc.connected = false; + o.connection = rc; + o.cancel(rc); + + rc.subscriberCount = 2; + rc.connected = true; + o.connection = rc; + o.cancel(rc); + + rc.subscriberCount = 1; + rc.connected = true; + o.connection = rc; + o.cancel(rc); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 4df48c59ac..7648cc69b5 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; @@ -29,14 +30,16 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.observable.ObservableRefCount.RefConnection; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; -import io.reactivex.subjects.ReplaySubject; +import io.reactivex.subjects.*; public class ObservableRefCountTest { @@ -813,6 +816,7 @@ public void connect(Consumer connection) { @Override protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); } } @@ -946,4 +950,329 @@ public void badSourceCompleteDisconnect() { assertTrue(ex.getCause() instanceof TestException); } } + + @Test(timeout = 7500) + public void blockingSourceAsnycCancel() throws Exception { + BehaviorSubject bs = BehaviorSubject.createDefault(1); + + Observable o = bs + .replay(1) + .refCount(); + + o.subscribe(); + + final AtomicBoolean interrupted = new AtomicBoolean(); + + o.switchMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter emitter) throws Exception { + while (!emitter.isDisposed()) { + Thread.sleep(100); + } + interrupted.set(true); + } + }); + } + }) + .take(500, TimeUnit.MILLISECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + + assertTrue(interrupted.get()); + } + + static ObservableTransformer refCount(final int n) { + return refCount(n, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline()); + } + + static ObservableTransformer refCount(final long time, final TimeUnit unit) { + return refCount(1, time, unit, Schedulers.computation()); + } + + static ObservableTransformer refCount(final long time, final TimeUnit unit, final Scheduler scheduler) { + return refCount(1, time, unit, scheduler); + } + + static ObservableTransformer refCount(final int n, final long time, final TimeUnit unit) { + return refCount(1, time, unit, Schedulers.computation()); + } + + static ObservableTransformer refCount(final int n, final long time, final TimeUnit unit, final Scheduler scheduler) { + return new ObservableTransformer() { + @Override + public Observable apply(Observable f) { + return new ObservableRefCount((ConnectableObservable)f, n, time, unit, scheduler); + } + }; + } + + @Test + public void byCount() { + final int[] subscriptions = { 0 }; + + Observable source = Observable.range(1, 5) + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + subscriptions[0]++; + } + }) + .publish() + .compose(ObservableRefCountTest.refCount(2)); + + for (int i = 0; i < 3; i++) { + TestObserver to1 = source.test(); + + to1.assertEmpty(); + + TestObserver to2 = source.test(); + + to1.assertResult(1, 2, 3, 4, 5); + to2.assertResult(1, 2, 3, 4, 5); + } + + assertEquals(3, subscriptions[0]); + } + + @Test + public void resubscribeBeforeTimeout() throws Exception { + final int[] subscriptions = { 0 }; + + PublishSubject ps = PublishSubject.create(); + + Observable source = ps + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + subscriptions[0]++; + } + }) + .publish() + .compose(ObservableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)); + + TestObserver to1 = source.test(); + + assertEquals(1, subscriptions[0]); + + to1.cancel(); + + Thread.sleep(100); + + to1 = source.test(); + + assertEquals(1, subscriptions[0]); + + Thread.sleep(500); + + assertEquals(1, subscriptions[0]); + + ps.onNext(1); + ps.onNext(2); + ps.onNext(3); + ps.onNext(4); + ps.onNext(5); + ps.onComplete(); + + to1 + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void letitTimeout() throws Exception { + final int[] subscriptions = { 0 }; + + PublishSubject ps = PublishSubject.create(); + + Observable source = ps + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + subscriptions[0]++; + } + }) + .publish() + .compose(ObservableRefCountTest.refCount(1, 100, TimeUnit.MILLISECONDS)); + + TestObserver to1 = source.test(); + + assertEquals(1, subscriptions[0]); + + to1.cancel(); + + assertTrue(ps.hasObservers()); + + Thread.sleep(200); + + assertFalse(ps.hasObservers()); + } + + @Test + public void error() { + Observable.error(new IOException()) + .publish() + .compose(ObservableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)) + .test() + .assertFailure(IOException.class); + } + + @Test(expected = ClassCastException.class) + public void badUpstream() { + Observable.range(1, 5) + .compose(ObservableRefCountTest.refCount(500, TimeUnit.MILLISECONDS, Schedulers.single())) + ; + } + + @Test + public void comeAndGo() { + PublishSubject ps = PublishSubject.create(); + + Observable source = ps + .publish() + .compose(ObservableRefCountTest.refCount(1)); + + TestObserver to1 = source.test(); + + assertTrue(ps.hasObservers()); + + for (int i = 0; i < 3; i++) { + TestObserver to2 = source.test(); + to1.cancel(); + to1 = to2; + } + + to1.cancel(); + + assertFalse(ps.hasObservers()); + } + + @Test + public void unsubscribeSubscribeRace() { + for (int i = 0; i < 1000; i++) { + + final Observable source = Observable.range(1, 5) + .replay() + .compose(ObservableRefCountTest.refCount(1)) + ; + + final TestObserver to1 = source.test(); + + final TestObserver to2 = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + source.subscribe(to2); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to2 + .withTag("Round: " + i) + .assertResult(1, 2, 3, 4, 5); + } + } + + static final class BadObservableDoubleOnX extends ConnectableObservable + implements Disposable { + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onComplete(); + observer.onError(new TestException()); + } + + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + } + + @Test + public void doubleOnX() { + List errors = TestHelper.trackPluginErrors(); + try { + new BadObservableDoubleOnX() + .refCount() + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void cancelTerminateStateExclusion() { + ObservableRefCount o = (ObservableRefCount)PublishSubject.create() + .publish() + .refCount(); + + o.cancel(null); + + RefConnection rc = new RefConnection(o); + o.connection = null; + rc.subscriberCount = 0; + o.timeout(rc); + + rc.subscriberCount = 1; + o.timeout(rc); + + o.connection = rc; + o.timeout(rc); + + rc.subscriberCount = 0; + o.timeout(rc); + + // ------------------- + + rc.subscriberCount = 2; + rc.connected = false; + o.connection = rc; + o.cancel(rc); + + rc.subscriberCount = 1; + rc.connected = false; + o.connection = rc; + o.cancel(rc); + + rc.subscriberCount = 2; + rc.connected = true; + o.connection = rc; + o.cancel(rc); + + rc.subscriberCount = 1; + rc.connected = true; + o.connection = rc; + o.cancel(rc); + } }