Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Subscription Object Allocation #1283

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx;

import rx.subscriptions.SubscriptionList;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -30,17 +31,23 @@
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

private final CompositeSubscription cs;
private final SubscriptionList cs;

protected Subscriber(CompositeSubscription cs) {
protected Subscriber(SubscriptionList cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeSubscription can not be null");
}
this.cs = cs;
}

@Deprecated
protected Subscriber(CompositeSubscription cs) {
this(new SubscriptionList());
add(cs);
}

protected Subscriber() {
this(new CompositeSubscription());
this(new SubscriptionList());
}

protected Subscriber(Subscriber<?> op) {
Expand Down
5 changes: 2 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -46,9 +45,9 @@ public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {

@Override
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, T>> childObserver) {
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
// a new SubscriptionList to decouple the subscription as the inner subscriptions need a separate lifecycle
// and will unsubscribe on this parent if they are all unsubscribed
return new Subscriber<T>(new CompositeSubscription()) {
return new Subscriber<T>() {
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
private final AtomicInteger completionCounter = new AtomicInteger(0);
private final AtomicBoolean completionEmitted = new AtomicBoolean(false);
Expand Down
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperatorPivot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import rx.Subscriber;
import rx.functions.Action0;
import rx.observables.GroupedObservable;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

public class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {

@Override
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
final AtomicReference<State> state = new AtomicReference<State>(State.create());
final OperatorPivot<K1, K2, T>.PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state);
final OperatorPivot<K1, K2, T>.PivotSubscriber pivotSubscriber = new PivotSubscriber(new SubscriptionList(), child, state);
child.add(Subscriptions.create(new Action0() {

@Override
Expand Down Expand Up @@ -65,12 +65,12 @@ private final class PivotSubscriber extends Subscriber<GroupedObservable<K1, Gro
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
* and will unsubscribe on this parent if they are all unsubscribed
*/
private final CompositeSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
private final AtomicReference<State> state;
private final GroupState<K1, K2, T> groups;

private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
private PivotSubscriber(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
super(parentSubscription);
this.parentSubscription = parentSubscription;
this.child = child;
Expand Down Expand Up @@ -159,10 +159,10 @@ private static class GroupState<K1, K2, T> {
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
private final AtomicBoolean completeEmitted = new AtomicBoolean();
private final CompositeSubscription parentSubscription;
private final SubscriptionList parentSubscription;
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;

public GroupState(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
public GroupState(SubscriptionList parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
this.parentSubscription = parentSubscription;
this.child = child;
}
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SubscriptionList;

/**
* Returns an Observable that emits the first <code>num</code> items emitted by the source
Expand All @@ -40,7 +40,7 @@ public OperatorTake(int limit) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
final SubscriptionList parent = new SubscriptionList();
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SubscriptionList;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
final SubscriptionList parentSubscription = new SubscriptionList();
subscriber.add(Subscriptions.create(new Action0() {

@Override
Expand Down
163 changes: 53 additions & 110 deletions rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package rx.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Set;

import rx.Subscription;
import rx.exceptions.CompositeException;
Expand All @@ -30,154 +33,94 @@
*/
public final class CompositeSubscription implements Subscription {

private final AtomicReference<State> state = new AtomicReference<State>();

/** Empty initial state. */
private static final State CLEAR_STATE;
/** Unsubscribed empty state. */
private static final State CLEAR_STATE_UNSUBSCRIBED;
static {
Subscription[] s0 = new Subscription[0];
CLEAR_STATE = new State(false, s0);
CLEAR_STATE_UNSUBSCRIBED = new State(true, s0);
}

private static final class State {
final boolean isUnsubscribed;
final Subscription[] subscriptions;

State(boolean u, Subscription[] s) {
this.isUnsubscribed = u;
this.subscriptions = s;
}

State unsubscribe() {
return CLEAR_STATE_UNSUBSCRIBED;
}

State add(Subscription s) {
int idx = subscriptions.length;
Subscription[] newSubscriptions = new Subscription[idx + 1];
System.arraycopy(subscriptions, 0, newSubscriptions, 0, idx);
newSubscriptions[idx] = s;
return new State(isUnsubscribed, newSubscriptions);
}

State remove(Subscription s) {
if ((subscriptions.length == 1 && subscriptions[0].equals(s)) || subscriptions.length == 0) {
return clear();
}
Subscription[] newSubscriptions = new Subscription[subscriptions.length - 1];
int idx = 0;
for (Subscription _s : subscriptions) {
if (!_s.equals(s)) {
// was not in this composite
if (idx == newSubscriptions.length) {
return this;
}
newSubscriptions[idx] = _s;
idx++;
}
}
if (idx == 0) {
return clear();
}
// subscription appeared more than once
if (idx < newSubscriptions.length) {
Subscription[] newSub2 = new Subscription[idx];
System.arraycopy(newSubscriptions, 0, newSub2, 0, idx);
return new State(isUnsubscribed, newSub2);
}
return new State(isUnsubscribed, newSubscriptions);
}

State clear() {
return isUnsubscribed ? CLEAR_STATE_UNSUBSCRIBED : CLEAR_STATE;
}
}
private Set<Subscription> subscriptions;
private boolean unsubscribed = false;

public CompositeSubscription() {
state.set(CLEAR_STATE);
}

public CompositeSubscription(final Subscription... subscriptions) {
state.set(new State(false, subscriptions));
this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
}

@Override
public boolean isUnsubscribed() {
return state.get().isUnsubscribed;
public synchronized boolean isUnsubscribed() {
return unsubscribed;
}

public void add(final Subscription s) {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
s.unsubscribe();
return;
Subscription unsubscribe = null;
synchronized (this) {
if (unsubscribed) {
unsubscribe = s;
} else {
newState = oldState.add(s);
if (subscriptions == null) {
subscriptions = new HashSet<Subscription>(4);
}
subscriptions.add(s);
}
} while (!state.compareAndSet(oldState, newState));
}
if (unsubscribe != null) {
// call after leaving the synchronized block so we're not holding a lock while executing this
unsubscribe.unsubscribe();
}
}

public void remove(final Subscription s) {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
boolean unsubscribe = false;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
} else {
newState = oldState.remove(s);
}
} while (!state.compareAndSet(oldState, newState));
// if we removed successfully we then need to call unsubscribe on it
s.unsubscribe();
unsubscribe = subscriptions.remove(s);
}
if (unsubscribe) {
// if we removed successfully we then need to call unsubscribe on it (outside of the lock)
s.unsubscribe();
}
}

public void clear() {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
Collection<Subscription> unsubscribe = null;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
} else {
newState = oldState.clear();
unsubscribe = subscriptions;
subscriptions = null;
}
} while (!state.compareAndSet(oldState, newState));
// if we cleared successfully we then need to call unsubscribe on all previous
unsubscribeFromAll(oldState.subscriptions);
}
unsubscribeFromAll(unsubscribe);
}

@Override
public void unsubscribe() {
State oldState;
State newState;
do {
oldState = state.get();
if (oldState.isUnsubscribed) {
synchronized (this) {
if (unsubscribed) {
return;
} else {
newState = oldState.unsubscribe();
}
} while (!state.compareAndSet(oldState, newState));
unsubscribeFromAll(oldState.subscriptions);
unsubscribed = true;
}
// we will only get here once
unsubscribeFromAll(subscriptions);
}

private static void unsubscribeFromAll(Subscription[] subscriptions) {
final List<Throwable> es = new ArrayList<Throwable>();
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
if (!es.isEmpty()) {
if (es != null) {
if (es.size() == 1) {
Throwable t = es.get(0);
if (t instanceof RuntimeException) {
Expand Down
Loading