-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rewrite OnSubscribeRefCount to handle synchronous source #1695
Conversation
…eck to OperatorMulticast
// need to use this overload of connect to ensure that | ||
// baseSubscription is set in the case that source is a synchronous | ||
// Observable | ||
source.connect(new Action1<Subscription>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should put unlock
in a finally clause because Action1 may be ignored if already unsubscribed. E.g.,
try {
source.connect(new Action1<Subscription>() {
@Override
public void call(Subscription subscription) {
baseSubscription = subscription;
// handle unsubscribing from the base subscription
subscriber.add(disconnect());
// ready to subscribe to source so do it
source.unsafeSubscribe(subscriber);
}
});
}
finally {
// release the write lock
lock.writeLock().unlock();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that approach won't work because source may be synchronous and the finally block doesn't get executed till the source completes preventing other subscription. I agree though that the finally is needed, just need to get the locking right.
would appreciate review of latest commit which incorporates @zsxwing suggestions |
@Override | ||
public void call() { | ||
if (subscriptionCount.decrementAndGet() == 0) { | ||
baseSubscription.unsubscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could disconnect the source immediately if one thread reaches L125 while another thread reaches L81 (because the counter was decremented to 0 and then incremented to 1 in the latter thread), i.e. prevent the reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks David, I'll review given that fact.
…that is used for subscription and disconnect
Righto, over to you concurrency switched on people again. I'm not convinced that given the holes that @akarnokd pointed out that a dual lock arrangement is going to work so I've wound back to using a single lock for all subscriptions and disconnect. |
I don't quite understand these changes yet. Partly I'm confused because the newly added unit tests pass even if the code changes aren't applied. |
// need to use this overload of connect to ensure that | ||
// baseSubscription is set in the case that source is a | ||
// synchronous Observable | ||
source.connect(onSubscribe(subscriber, writeLocked)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this block right here on a synchronous source and then deadlock any other subscriber trying to subscribe since the lock is now held?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because connect is called with an action that allows the subscription to
be read and in our case recorded. The action also unlocks.
On 3 Oct 2014 11:38, "Ben Christensen" [email protected] wrote:
In
rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRefCount.java:
queue = null;
if (localQueue == null) {
emitting = false;
return;
- public void call(final Subscriber<? super T> subscriber) {
lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {
final AtomicBoolean writeLocked = new AtomicBoolean(true);
try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
Won't this block right here on a synchronous source and then deadlock any
other subscriber trying to subscribe since the lock is now held?—
Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/pull/1695/files#r18372521.
Spent time reviewing this again and I think it's good so merging. Thanks @davidmoten for the fix and @zsxwing and @akarnokd for the reviews. I considered alternatives to the single-lock approach but am not confident enough in any of them to attempt right now. Correctness first, we can attempt enhancements later. |
rewrite OnSubscribeRefCount to handle synchronous source
Resubmitting this one having stuffed up the rebase.
Rewrite of
OnSubscribeRefCount
because original didn't handle synchronous source (#1688) and as far as I could see the original was fundamentally broken by this use case. Please note also includes a fix inOperatorMulticast
where a null check was required. I've done the rewrite using locks rather than submit a lock-free version just to ensure that the approach is valid with the experts first.