Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorMulticast.connect(connection) should not return null #2779

Merged
merged 4 commits into from
Mar 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions src/main/java/rx/internal/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ public final class OperatorMulticast<T, R> extends ConnectableObservable<R> {
final Observable<? extends T> source;
final Object guard;
final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
private final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
private final List<Subscriber<? super R>> waitingForConnect;
final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
final List<Subscriber<? super R>> waitingForConnect;

/** Guarded by guard. */
Subscriber<T> subscription;
private Subscriber<T> subscription;
// wraps subscription above for unsubscription using guard
private Subscription guardedSubscription;

public OperatorMulticast(Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
this(new Object(), new AtomicReference<Subject<? super T, ? extends R>>(), new ArrayList<Subscriber<? super R>>(), source, subjectFactory);
Expand Down Expand Up @@ -77,15 +79,13 @@ public void call(Subscriber<? super R> subscriber) {
public void connect(Action1<? super Subscription> connection) {
// each time we connect we create a new Subject and Subscription

boolean shouldSubscribe = false;

// subscription is the state of whether we are connected or not
synchronized (guard) {
if (subscription != null) {
// already connected, return as there is nothing to do
// already connected
connection.call(guardedSubscription);
return;
} else {
shouldSubscribe = true;
// we aren't connected, so let's create a new Subject and connect
final Subject<? super T, ? extends R> subject = subjectFactory.call();
// create new Subscriber that will pass-thru to the subject we just created
Expand All @@ -106,6 +106,26 @@ public void onNext(T args) {
subject.onNext(args);
}
};
final AtomicReference<Subscription> gs = new AtomicReference<Subscription>();
gs.set(Subscriptions.create(new Action0() {
@Override
public void call() {
Subscription s;
synchronized (guard) {
if ( guardedSubscription == gs.get()) {
s = subscription;
subscription = null;
guardedSubscription = null;
connectedSubject.set(null);
} else
return;
}
if (s != null) {
s.unsubscribe();
}
}
}));
guardedSubscription = gs.get();

// register any subscribers that are waiting with this new subject
for(Subscriber<? super R> s : waitingForConnect) {
Expand All @@ -116,34 +136,20 @@ public void onNext(T args) {
// record the Subject so OnSubscribe can see it
connectedSubject.set(subject);
}

}

// in the lock above we determined we should subscribe, do it now outside the lock
if (shouldSubscribe) {
// register a subscription that will shut this down
connection.call(Subscriptions.create(new Action0() {
@Override
public void call() {
Subscription s;
synchronized (guard) {
s = subscription;
subscription = null;
connectedSubject.set(null);
}
if (s != null) {
s.unsubscribe();
}
}
}));
// register a subscription that will shut this down
connection.call(guardedSubscription);

// now that everything is hooked up let's subscribe
// as long as the subscription is not null (which can happen if already unsubscribed)
boolean subscriptionIsNull;
synchronized(guard) {
subscriptionIsNull = subscription == null;
}
if (!subscriptionIsNull)
source.subscribe(subscription);
// now that everything is hooked up let's subscribe
// as long as the subscription is not null (which can happen if already unsubscribed)
Subscriber<T> sub;
synchronized (guard) {
sub = subscription;
}
if (sub != null)
source.subscribe(sub);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Assert;
import org.junit.Test;

import rx.Observer;
Expand All @@ -29,7 +31,7 @@
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class OnSubscribeMulticastTest {
public class OperatorMulticastTest {

@Test
public void testMulticast() {
Expand Down Expand Up @@ -70,15 +72,17 @@ public void testMulticastConnectTwice() {

source.onNext("one");

multicasted.connect();
multicasted.connect();

Subscription sub = multicasted.connect();
Subscription sub2 = multicasted.connect();
source.onNext("two");
source.onCompleted();

verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onCompleted();

assertEquals(sub, sub2);

}

Expand Down