Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix refCount() connect/subscribe/cancel deadlock #5975

Merged
merged 5 commits into from
Apr 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@

package io.reactivex.internal.operators.flowable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;

import org.reactivestreams.*;

import io.reactivex.FlowableSubscriber;
import io.reactivex.disposables.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;

/**
* Returns an observable sequence that stays connected to the source as long as
Expand All @@ -31,199 +34,202 @@
* @param <T>
* the value type
*/
public final class FlowableRefCount<T> extends AbstractFlowableWithUpstream<T, T> {
public final class FlowableRefCount<T> extends Flowable<T> {

final ConnectableFlowable<T> source;
volatile CompositeDisposable baseDisposable = new CompositeDisposable();
final AtomicInteger subscriptionCount = new AtomicInteger();

/**
* Use this lock for every subscription and disconnect action.
*/
final ReentrantLock lock = new ReentrantLock();

final class ConnectionSubscriber
extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = 152064694420235350L;
final Subscriber<? super T> subscriber;
final CompositeDisposable currentBase;
final Disposable resource;

final AtomicLong requested;

ConnectionSubscriber(Subscriber<? super T> subscriber,
CompositeDisposable currentBase, Disposable resource) {
this.subscriber = subscriber;
this.currentBase = currentBase;
this.resource = resource;
this.requested = new AtomicLong();
}

@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.deferredSetOnce(this, requested, s);
}
final int n;

@Override
public void onError(Throwable e) {
cleanup();
subscriber.onError(e);
}
final long timeout;

@Override
public void onNext(T t) {
subscriber.onNext(t);
}
final TimeUnit unit;

@Override
public void onComplete() {
cleanup();
subscriber.onComplete();
}

@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(this, requested, n);
}
final Scheduler scheduler;

@Override
public void cancel() {
SubscriptionHelper.cancel(this);
resource.dispose();
}
RefConnection connection;

void cleanup() {
// on error or completion we need to dispose the base CompositeDisposable
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseDisposable == currentBase) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
baseDisposable.dispose();
baseDisposable = new CompositeDisposable();
subscriptionCount.set(0);
}
} finally {
lock.unlock();
}
}
public FlowableRefCount(ConnectableFlowable<T> source) {
this(source, 1, 0L, TimeUnit.NANOSECONDS, Schedulers.trampoline());
}

/**
* Constructor.
*
* @param source
* observable to apply ref count to
*/
public FlowableRefCount(ConnectableFlowable<T> source) {
super(source);
public FlowableRefCount(ConnectableFlowable<T> source, int n, long timeout, TimeUnit unit,
Scheduler scheduler) {
this.source = source;
this.n = n;
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Subscriber<? super T> subscriber) {

lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {

final AtomicBoolean writeLocked = new AtomicBoolean(true);

try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Consumer passed
// to source.connect above being called
if (writeLocked.get()) {
// Consumer passed to source.connect was not called
lock.unlock();
}
protected void subscribeActual(Subscriber<? super T> s) {

RefConnection conn;

boolean connect = false;
synchronized (this) {
conn = connection;
if (conn == null) {
conn = new RefConnection(this);
connection = conn;
}

long c = conn.subscriberCount;
if (c == 0L && conn.timer != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this condition is only actual for resubscribeBeforeTimeout test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to see logic and tests for something that user cannot use in RxJava

As you mentioned in PR description, let's maybe expose additional time related refCount api or try to shrink unused code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #5986.

conn.timer.dispose();
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseDisposable);
} finally {
// release the read lock
lock.unlock();
conn.subscriberCount = c + 1;
if (!conn.connected && c + 1 == n) {
connect = true;
conn.connected = true;
}
}

source.subscribe(new RefCountSubscriber<T>(s, this, conn));

if (connect) {
source.connect(conn);
}
}

private Consumer<Disposable> onSubscribe(final Subscriber<? super T> subscriber,
final AtomicBoolean writeLocked) {
return new DisposeConsumer(subscriber, writeLocked);
void cancel(RefConnection rc) {
SequentialDisposable sd;
synchronized (this) {
if (connection == null) {
return;
}
long c = rc.subscriberCount - 1;
rc.subscriberCount = c;
if (c != 0L || !rc.connected) {
return;
}
if (timeout == 0L) {
timeout(rc);
return;
}
sd = new SequentialDisposable();
rc.timer = sd;
}

sd.replace(scheduler.scheduleDirect(rc, timeout, unit));
}

void doSubscribe(final Subscriber<? super T> subscriber, final CompositeDisposable currentBase) {
// handle disposing from the base subscription
Disposable d = disconnect(currentBase);

ConnectionSubscriber connection = new ConnectionSubscriber(subscriber, currentBase, d);
subscriber.onSubscribe(connection);
void terminated(RefConnection rc) {
synchronized (this) {
if (connection != null) {
connection = null;
if (rc.timer != null) {
rc.timer.dispose();
}
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
}
}
}

source.subscribe(connection);
void timeout(RefConnection rc) {
synchronized (this) {
if (rc.subscriberCount == 0 && rc == connection) {
connection = null;
DisposableHelper.dispose(rc);
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
}
}
}

private Disposable disconnect(final CompositeDisposable current) {
return Disposables.fromRunnable(new DisposeTask(current));
static final class RefConnection extends AtomicReference<Disposable>
implements Runnable, Consumer<Disposable> {

private static final long serialVersionUID = -4552101107598366241L;

final FlowableRefCount<?> parent;

Disposable timer;

long subscriberCount;

boolean connected;

RefConnection(FlowableRefCount<?> parent) {
this.parent = parent;
}

@Override
public void run() {
parent.timeout(this);
}

@Override
public void accept(Disposable t) throws Exception {
DisposableHelper.replace(this, t);
}
}

final class DisposeConsumer implements Consumer<Disposable> {
private final Subscriber<? super T> subscriber;
private final AtomicBoolean writeLocked;
static final class RefCountSubscriber<T>
extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = -7419642935409022375L;

final Subscriber<? super T> actual;

final FlowableRefCount<T> parent;

final RefConnection connection;

DisposeConsumer(Subscriber<? super T> subscriber, AtomicBoolean writeLocked) {
this.subscriber = subscriber;
this.writeLocked = writeLocked;
Subscription upstream;

RefCountSubscriber(Subscriber<? super T> actual, FlowableRefCount<T> parent, RefConnection connection) {
this.actual = actual;
this.parent = parent;
this.connection = connection;
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void accept(Disposable subscription) {
try {
baseDisposable.add(subscription);
// ready to subscribe to source so do it
doSubscribe(subscriber, baseDisposable);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
public void onError(Throwable t) {
if (compareAndSet(false, true)) {
parent.terminated(connection);
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}
}

final class DisposeTask implements Runnable {
private final CompositeDisposable current;
@Override
public void onComplete() {
if (compareAndSet(false, true)) {
parent.terminated(connection);
actual.onComplete();
}
}

DisposeTask(CompositeDisposable current) {
this.current = current;
@Override
public void request(long n) {
upstream.request(n);
}

@Override
public void run() {
lock.lock();
try {
if (baseDisposable == current) {
if (subscriptionCount.decrementAndGet() == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}

baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
baseDisposable = new CompositeDisposable();
}
}
} finally {
lock.unlock();
public void cancel() {
upstream.cancel();
if (compareAndSet(false, true)) {
parent.cancel(connection);
}
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(upstream, s)) {
this.upstream = s;

actual.onSubscribe(this);
}
}
}
Expand Down
Loading