Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hide ChainedSubscription/SubscriptionList from Public API #1309

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx;

import rx.subscriptions.ChainedSubscription;
import rx.internal.util.SubscriptionList;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -32,27 +32,20 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final ChainedSubscription cs;
private final SubscriptionList cs;

protected Subscriber(ChainedSubscription cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
this.cs = cs;
}

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new ChainedSubscription());
this.cs = new SubscriptionList();
add(cs);
}

protected Subscriber() {
this(new ChainedSubscription());
this.cs = new SubscriptionList();
}

protected Subscriber(Subscriber<?> op) {
this(op.cs);
this.cs = op.cs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand Down Expand Up @@ -55,7 +54,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
super(new ChainedSubscription());
super();
this.keySelector = keySelector;
this.child = child;
}
Expand Down
26 changes: 9 additions & 17 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand All @@ -61,21 +61,14 @@ public void call() {
}

private static final class PivotSubscriber<K1, K2, T> extends Subscriber<GroupedObservable<K1, GroupedObservable<K2, T>>> {
/*
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final ChainedSubscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
private PivotSubscriber(Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
this.child = child;
this.state = state;
this.groups = new GroupState<K1, K2, T>(parentSubscription, child);
this.groups = new GroupState<K1, K2, T>(this, child);
}

@Override
Expand All @@ -102,7 +95,7 @@ public void onError(Throwable e) {
@Override
public void onNext(final GroupedObservable<K1, GroupedObservable<K2, T>> k1Group) {
groups.startK1Group(state, k1Group.getKey());
k1Group.unsafeSubscribe(new Subscriber<GroupedObservable<K2, T>>(parentSubscription) {
k1Group.unsafeSubscribe(new Subscriber<GroupedObservable<K2, T>>(this) {

@Override
public void onCompleted() {
Expand All @@ -124,7 +117,7 @@ public void onNext(final GroupedObservable<K2, T> k2Group) {
// we have been unsubscribed
return;
}
k2Group.unsafeSubscribe(new Subscriber<T>(parentSubscription) {
k2Group.unsafeSubscribe(new Subscriber<T>(this) {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -158,16 +151,15 @@ public void onNext(T t) {
private static final class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final ChainedSubscription parentSubscription;
private final Subscription parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
/** Indicates a terminal state. */
volatile int completed;
/** Field updater for completed. */
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");

public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(Subscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
34 changes: 17 additions & 17 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.ChainedSubscription;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,22 +39,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final ChainedSubscription parent = new ChainedSubscription();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}

/*
* We decouple the parent and child subscription so there can be multiple take() in a chain
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
*/
child.add(parent);
return new Subscriber<T>(parent) {
Subscriber<T> parent = new Subscriber<T>() {

int count = 0;
boolean completed = false;
Expand Down Expand Up @@ -87,6 +71,22 @@ public void onNext(T i) {
}

};

if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}

/*
* We decouple the parent and child subscription so there can be multiple take() in a chain
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
*/
child.add(parent);
return parent;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.ChainedSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,25 +35,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final ChainedSubscription parentSubscription = new ChainedSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
final Scheduler.Worker inner = scheduler.createWorker();
inner.schedule(new Action0() {

@Override
public void call() {
parentSubscription.unsubscribe();
inner.unsubscribe();
}
});
}

}));

return new Subscriber<T>(parentSubscription) {
final Subscriber<T> parent = new Subscriber<T>() {

@Override
public void onCompleted() {
Expand All @@ -72,5 +53,26 @@ public void onNext(T t) {
}

};

subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
final Scheduler.Worker inner = scheduler.createWorker();
inner.schedule(new Action0() {

@Override
public void call() {
parent.unsubscribe();
inner.unsubscribe();
}
});
}

}));

return parent;


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;
package rx.internal.util;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -29,15 +29,15 @@
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
*/
public final class ChainedSubscription implements Subscription {
public final class SubscriptionList implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;

public ChainedSubscription() {
public SubscriptionList() {
}

public ChainedSubscription(final Subscription... subscriptions) {
public SubscriptionList(final Subscription... subscriptions) {
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
}

Expand All @@ -47,8 +47,8 @@ public synchronized boolean isUnsubscribed() {
}

/**
* Adds a new {@link Subscription} to this {@code ChainedSubscription} if the {@code ChainedSubscription} is
* not yet unsubscribed. If the {@code ChainedSubscription} <em>is</em> unsubscribed, {@code add} will
* Adds a new {@link Subscription} to this {@code SubscriptionList} if the {@code SubscriptionList} is
* not yet unsubscribed. If the {@code SubscriptionList} <em>is</em> unsubscribed, {@code add} will
* indicate this by explicitly unsubscribing the new {@code Subscription} as well.
*
* @param s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;
package rx.internal.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -29,13 +29,14 @@

import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.internal.util.SubscriptionList;

public class ChainedSubscriptionTest {
public class SubscriptionListTest {

@Test
public void testSuccess() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -70,7 +71,7 @@ public boolean isUnsubscribed() {
@Test(timeout = 1000)
public void shouldUnsubscribeAll() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
Expand Down Expand Up @@ -117,7 +118,7 @@ public void run() {
@Test
public void testException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -159,7 +160,7 @@ public boolean isUnsubscribed() {
@Test
public void testCompositeException() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand Down Expand Up @@ -215,7 +216,7 @@ public boolean isUnsubscribed() {
@Test
public void testUnsubscribeIdempotence() {
final AtomicInteger counter = new AtomicInteger();
ChainedSubscription s = new ChainedSubscription();
SubscriptionList s = new SubscriptionList();
s.add(new Subscription() {

@Override
Expand All @@ -241,7 +242,7 @@ public boolean isUnsubscribed() {
public void testUnsubscribeIdempotenceConcurrently()
throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ChainedSubscription s = new ChainedSubscription();
final SubscriptionList s = new SubscriptionList();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
Expand Down