Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorMulticast.connect(connection) should not return null #2779

Merged
merged 4 commits into from
Mar 2, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions src/main/java/rx/internal/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@
* the result value type
*/
public final class OperatorMulticast<T, R> extends ConnectableObservable<R> {
final Observable<? extends T> source;
final Object guard;
final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
private final Observable<? extends T> source;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to avoid private because the compiler generates accessor methods which slows down the interpreter phase and makes an extra hop while debugging methods.

private final Object guard;
private final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
private final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
private final List<Subscriber<? super R>> waitingForConnect;

/** Guarded by guard. */
Subscriber<T> subscription;
private Subscriber<T> subscription;
// wraps subscription above for unsubscription using guard
private Subscription guardedSubscription;

public OperatorMulticast(Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
this(new Object(), new AtomicReference<Subject<? super T, ? extends R>>(), new ArrayList<Subscriber<? super R>>(), source, subjectFactory);
Expand Down Expand Up @@ -82,7 +84,8 @@ public void connect(Action1<? super Subscription> connection) {
// subscription is the state of whether we are connected or not
synchronized (guard) {
if (subscription != null) {
// already connected, return as there is nothing to do
// already connected
connection.call(guardedSubscription);
return;
} else {
shouldSubscribe = true;
Expand All @@ -106,6 +109,21 @@ public void onNext(T args) {
subject.onNext(args);
}
};
guardedSubscription = Subscriptions.create(new Action0() {
@Override
public void call() {
Subscription s;
synchronized (guard) {
s = subscription;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd check if guardedSubscription == this to avoid unsubscribing a newer connection, i.e.:

Subscription s1 = src.connect();
Subscription s2 = src.connect();

s1.unsubscribe();

Subscription s3 = src.connect();

s2.unsubscribe();

// src should be still connected

subscription = null;
guardedSubscription = null;
connectedSubject.set(null);
}
if (s != null) {
s.unsubscribe();
}
}
});

// register any subscribers that are waiting with this new subject
for(Subscriber<? super R> s : waitingForConnect) {
Expand All @@ -116,34 +134,22 @@ public void onNext(T args) {
// record the Subject so OnSubscribe can see it
connectedSubject.set(subject);
}

}

// in the lock above we determined we should subscribe, do it now outside the lock
if (shouldSubscribe) {
// register a subscription that will shut this down
connection.call(Subscriptions.create(new Action0() {
@Override
public void call() {
Subscription s;
synchronized (guard) {
s = subscription;
subscription = null;
connectedSubject.set(null);
}
if (s != null) {
s.unsubscribe();
}
}
}));
connection.call(guardedSubscription);

// now that everything is hooked up let's subscribe
// as long as the subscription is not null (which can happen if already unsubscribed)
boolean subscriptionIsNull;
synchronized(guard) {
subscriptionIsNull = subscription == null;
Subscriber<T> sub;
synchronized (guard) {
sub = subscription;
}
if (!subscriptionIsNull)
source.subscribe(subscription);
if (sub != null)
source.subscribe(sub);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Assert;
import org.junit.Test;

import rx.Observer;
Expand All @@ -29,7 +31,7 @@
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class OnSubscribeMulticastTest {
public class OperatorMulticastTest {

@Test
public void testMulticast() {
Expand Down Expand Up @@ -70,15 +72,17 @@ public void testMulticastConnectTwice() {

source.onNext("one");

multicasted.connect();
multicasted.connect();

Subscription sub = multicasted.connect();
Subscription sub2 = multicasted.connect();
source.onNext("two");
source.onCompleted();

verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onCompleted();

assertEquals(sub, sub2);

}

Expand Down