diff --git a/rxjava-core/src/main/java/rx/Subscriber.java b/rxjava-core/src/main/java/rx/Subscriber.java index 3ad986813c..cc94620610 100644 --- a/rxjava-core/src/main/java/rx/Subscriber.java +++ b/rxjava-core/src/main/java/rx/Subscriber.java @@ -15,6 +15,7 @@ */ package rx; +import rx.subscriptions.SubscriptionList; import rx.subscriptions.CompositeSubscription; /** @@ -30,17 +31,23 @@ */ public abstract class Subscriber implements Observer, Subscription { - private final CompositeSubscription cs; + private final SubscriptionList cs; - protected Subscriber(CompositeSubscription cs) { + protected Subscriber(SubscriptionList cs) { if (cs == null) { throw new IllegalArgumentException("The CompositeSubscription can not be null"); } this.cs = cs; } + + @Deprecated + protected Subscriber(CompositeSubscription cs) { + this(new SubscriptionList()); + add(cs); + } protected Subscriber() { - this(new CompositeSubscription()); + this(new SubscriptionList()); } protected Subscriber(Subscriber op) { diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java index b530bd9d86..fad9dab027 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java @@ -27,7 +27,6 @@ import rx.functions.Action0; import rx.functions.Func1; import rx.observables.GroupedObservable; -import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; /** @@ -46,9 +45,9 @@ public OperatorGroupBy(final Func1 keySelector) { @Override public Subscriber call(final Subscriber> childObserver) { - // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle + // a new SubscriptionList to decouple the subscription as the inner subscriptions need a separate lifecycle // and will unsubscribe on this parent if they are all unsubscribed - return new Subscriber(new CompositeSubscription()) { + return new Subscriber() { private final Map> groups = new HashMap>(); private final AtomicInteger completionCounter = new AtomicInteger(0); private final AtomicBoolean completionEmitted = new AtomicBoolean(false); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorPivot.java b/rxjava-core/src/main/java/rx/operators/OperatorPivot.java index 5b9c6742fd..94f5071c86 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorPivot.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorPivot.java @@ -28,7 +28,7 @@ import rx.Subscriber; import rx.functions.Action0; import rx.observables.GroupedObservable; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SubscriptionList; import rx.subscriptions.Subscriptions; public class OperatorPivot implements Operator>, GroupedObservable>> { @@ -36,7 +36,7 @@ public class OperatorPivot implements Operator>> call(final Subscriber>> child) { final AtomicReference state = new AtomicReference(State.create()); - final OperatorPivot.PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state); + final OperatorPivot.PivotSubscriber pivotSubscriber = new PivotSubscriber(new SubscriptionList(), child, state); child.add(Subscriptions.create(new Action0() { @Override @@ -65,12 +65,12 @@ private final class PivotSubscriber extends Subscriber>> child; private final AtomicReference state; private final GroupState groups; - private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber>> child, AtomicReference state) { + private PivotSubscriber(SubscriptionList parentSubscription, Subscriber>> child, AtomicReference state) { super(parentSubscription); this.parentSubscription = parentSubscription; this.child = child; @@ -159,10 +159,10 @@ private static class GroupState { private final ConcurrentHashMap, Inner> innerSubjects = new ConcurrentHashMap, Inner>(); private final ConcurrentHashMap> outerSubjects = new ConcurrentHashMap>(); private final AtomicBoolean completeEmitted = new AtomicBoolean(); - private final CompositeSubscription parentSubscription; + private final SubscriptionList parentSubscription; private final Subscriber>> child; - public GroupState(CompositeSubscription parentSubscription, Subscriber>> child) { + public GroupState(SubscriptionList parentSubscription, Subscriber>> child) { this.parentSubscription = parentSubscription; this.child = child; } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTake.java b/rxjava-core/src/main/java/rx/operators/OperatorTake.java index 62ab98b41c..882528f194 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTake.java @@ -17,7 +17,7 @@ import rx.Observable.Operator; import rx.Subscriber; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SubscriptionList; /** * Returns an Observable that emits the first num items emitted by the source @@ -40,7 +40,7 @@ public OperatorTake(int limit) { @Override public Subscriber call(final Subscriber child) { - final CompositeSubscription parent = new CompositeSubscription(); + final SubscriptionList parent = new SubscriptionList(); if (limit == 0) { child.onCompleted(); parent.unsubscribe(); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java index 82018b65c6..0054903d3c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java @@ -19,7 +19,7 @@ import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; -import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SubscriptionList; import rx.subscriptions.Subscriptions; /** @@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) { @Override public Subscriber call(final Subscriber subscriber) { - final CompositeSubscription parentSubscription = new CompositeSubscription(); + final SubscriptionList parentSubscription = new SubscriptionList(); subscriber.add(Subscriptions.create(new Action0() { @Override diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index f9f045def7..1fa56161f1 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -16,8 +16,11 @@ package rx.subscriptions; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Set; import rx.Subscription; import rx.exceptions.CompositeException; @@ -30,154 +33,94 @@ */ public final class CompositeSubscription implements Subscription { - private final AtomicReference state = new AtomicReference(); - - /** Empty initial state. */ - private static final State CLEAR_STATE; - /** Unsubscribed empty state. */ - private static final State CLEAR_STATE_UNSUBSCRIBED; - static { - Subscription[] s0 = new Subscription[0]; - CLEAR_STATE = new State(false, s0); - CLEAR_STATE_UNSUBSCRIBED = new State(true, s0); - } - - private static final class State { - final boolean isUnsubscribed; - final Subscription[] subscriptions; - - State(boolean u, Subscription[] s) { - this.isUnsubscribed = u; - this.subscriptions = s; - } - - State unsubscribe() { - return CLEAR_STATE_UNSUBSCRIBED; - } - - State add(Subscription s) { - int idx = subscriptions.length; - Subscription[] newSubscriptions = new Subscription[idx + 1]; - System.arraycopy(subscriptions, 0, newSubscriptions, 0, idx); - newSubscriptions[idx] = s; - return new State(isUnsubscribed, newSubscriptions); - } - - State remove(Subscription s) { - if ((subscriptions.length == 1 && subscriptions[0].equals(s)) || subscriptions.length == 0) { - return clear(); - } - Subscription[] newSubscriptions = new Subscription[subscriptions.length - 1]; - int idx = 0; - for (Subscription _s : subscriptions) { - if (!_s.equals(s)) { - // was not in this composite - if (idx == newSubscriptions.length) { - return this; - } - newSubscriptions[idx] = _s; - idx++; - } - } - if (idx == 0) { - return clear(); - } - // subscription appeared more than once - if (idx < newSubscriptions.length) { - Subscription[] newSub2 = new Subscription[idx]; - System.arraycopy(newSubscriptions, 0, newSub2, 0, idx); - return new State(isUnsubscribed, newSub2); - } - return new State(isUnsubscribed, newSubscriptions); - } - - State clear() { - return isUnsubscribed ? CLEAR_STATE_UNSUBSCRIBED : CLEAR_STATE; - } - } + private Set subscriptions; + private boolean unsubscribed = false; public CompositeSubscription() { - state.set(CLEAR_STATE); } public CompositeSubscription(final Subscription... subscriptions) { - state.set(new State(false, subscriptions)); + this.subscriptions = new HashSet(Arrays.asList(subscriptions)); } @Override - public boolean isUnsubscribed() { - return state.get().isUnsubscribed; + public synchronized boolean isUnsubscribed() { + return unsubscribed; } public void add(final Subscription s) { - State oldState; - State newState; - do { - oldState = state.get(); - if (oldState.isUnsubscribed) { - s.unsubscribe(); - return; + Subscription unsubscribe = null; + synchronized (this) { + if (unsubscribed) { + unsubscribe = s; } else { - newState = oldState.add(s); + if (subscriptions == null) { + subscriptions = new HashSet(4); + } + subscriptions.add(s); } - } while (!state.compareAndSet(oldState, newState)); + } + if (unsubscribe != null) { + // call after leaving the synchronized block so we're not holding a lock while executing this + unsubscribe.unsubscribe(); + } } public void remove(final Subscription s) { - State oldState; - State newState; - do { - oldState = state.get(); - if (oldState.isUnsubscribed) { + boolean unsubscribe = false; + synchronized (this) { + if (unsubscribed || subscriptions == null) { return; - } else { - newState = oldState.remove(s); } - } while (!state.compareAndSet(oldState, newState)); - // if we removed successfully we then need to call unsubscribe on it - s.unsubscribe(); + unsubscribe = subscriptions.remove(s); + } + if (unsubscribe) { + // if we removed successfully we then need to call unsubscribe on it (outside of the lock) + s.unsubscribe(); + } } public void clear() { - State oldState; - State newState; - do { - oldState = state.get(); - if (oldState.isUnsubscribed) { + Collection unsubscribe = null; + synchronized (this) { + if (unsubscribed || subscriptions == null) { return; } else { - newState = oldState.clear(); + unsubscribe = subscriptions; + subscriptions = null; } - } while (!state.compareAndSet(oldState, newState)); - // if we cleared successfully we then need to call unsubscribe on all previous - unsubscribeFromAll(oldState.subscriptions); + } + unsubscribeFromAll(unsubscribe); } @Override public void unsubscribe() { - State oldState; - State newState; - do { - oldState = state.get(); - if (oldState.isUnsubscribed) { + synchronized (this) { + if (unsubscribed) { return; - } else { - newState = oldState.unsubscribe(); } - } while (!state.compareAndSet(oldState, newState)); - unsubscribeFromAll(oldState.subscriptions); + unsubscribed = true; + } + // we will only get here once + unsubscribeFromAll(subscriptions); } - private static void unsubscribeFromAll(Subscription[] subscriptions) { - final List es = new ArrayList(); + private static void unsubscribeFromAll(Collection subscriptions) { + if (subscriptions == null) { + return; + } + List es = null; for (Subscription s : subscriptions) { try { s.unsubscribe(); } catch (Throwable e) { + if (es == null) { + es = new ArrayList(); + } es.add(e); } } - if (!es.isEmpty()) { + if (es != null) { if (es.size() == 1) { Throwable t = es.get(0); if (t instanceof RuntimeException) { diff --git a/rxjava-core/src/main/java/rx/subscriptions/SubscriptionList.java b/rxjava-core/src/main/java/rx/subscriptions/SubscriptionList.java new file mode 100644 index 0000000000..c8fd4543f9 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/SubscriptionList.java @@ -0,0 +1,109 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import rx.Subscription; +import rx.exceptions.CompositeException; + +/** + * Subscription that represents a group of Subscriptions that are unsubscribed together. + * + * @see Rx.Net equivalent CompositeDisposable + */ +public final class SubscriptionList implements Subscription { + + private List subscriptions; + private boolean unsubscribed = false; + + public SubscriptionList() { + } + + public SubscriptionList(final Subscription... subscriptions) { + this.subscriptions = new LinkedList(Arrays.asList(subscriptions)); + } + + @Override + public synchronized boolean isUnsubscribed() { + return unsubscribed; + } + + public void add(final Subscription s) { + Subscription unsubscribe = null; + synchronized (this) { + if (unsubscribed) { + unsubscribe = s; + } else { + if (subscriptions == null) { + subscriptions = new LinkedList(); + } + subscriptions.add(s); + } + } + if (unsubscribe != null) { + // call after leaving the synchronized block so we're not holding a lock while executing this + unsubscribe.unsubscribe(); + } + } + + @Override + public void unsubscribe() { + synchronized (this) { + if (unsubscribed) { + return; + } + unsubscribed = true; + } + // we will only get here once + unsubscribeFromAll(subscriptions); + } + + private static void unsubscribeFromAll(Collection subscriptions) { + if (subscriptions == null) { + return; + } + List es = null; + for (Subscription s : subscriptions) { + try { + s.unsubscribe(); + } catch (Throwable e) { + if (es == null) { + es = new ArrayList(); + } + es.add(e); + } + } + if (es != null) { + if (es.size() == 1) { + Throwable t = es.get(0); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new CompositeException( + "Failed to unsubscribe to 1 or more subscriptions.", es); + } + } else { + throw new CompositeException( + "Failed to unsubscribe to 2 or more subscriptions.", es); + } + } + } +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/SubscriptionListTest.java b/rxjava-core/src/test/java/rx/subscriptions/SubscriptionListTest.java new file mode 100644 index 0000000000..b0f43c482e --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/SubscriptionListTest.java @@ -0,0 +1,286 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Subscription; +import rx.exceptions.CompositeException; + +public class SubscriptionListTest { + + @Test + public void testSuccess() { + final AtomicInteger counter = new AtomicInteger(); + SubscriptionList s = new SubscriptionList(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.unsubscribe(); + + assertEquals(2, counter.get()); + } + + @Test(timeout = 1000) + public void shouldUnsubscribeAll() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + final SubscriptionList s = new SubscriptionList(); + + final int count = 10; + final CountDownLatch start = new CountDownLatch(1); + for (int i = 0; i < count; i++) { + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + } + + final List threads = new ArrayList(); + for (int i = 0; i < count; i++) { + final Thread t = new Thread() { + @Override + public void run() { + try { + start.await(); + s.unsubscribe(); + } catch (final InterruptedException e) { + fail(e.getMessage()); + } + } + }; + t.start(); + threads.add(t); + } + + start.countDown(); + for (final Thread t : threads) { + t.join(); + } + + assertEquals(count, counter.get()); + } + + @Test + public void testException() { + final AtomicInteger counter = new AtomicInteger(); + SubscriptionList s = new SubscriptionList(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on first one"); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + try { + s.unsubscribe(); + fail("Expecting an exception"); + } catch (RuntimeException e) { + // we expect this + assertEquals(e.getMessage(), "failed on first one"); + } + + // we should still have unsubscribed to the second one + assertEquals(1, counter.get()); + } + + @Test + public void testCompositeException() { + final AtomicInteger counter = new AtomicInteger(); + SubscriptionList s = new SubscriptionList(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on first one"); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on second one too"); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + try { + s.unsubscribe(); + fail("Expecting an exception"); + } catch (CompositeException e) { + // we expect this + assertEquals(e.getExceptions().size(), 2); + } + + // we should still have unsubscribed to the second one + assertEquals(1, counter.get()); + } + + + @Test + public void testUnsubscribeIdempotence() { + final AtomicInteger counter = new AtomicInteger(); + SubscriptionList s = new SubscriptionList(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + s.unsubscribe(); + s.unsubscribe(); + s.unsubscribe(); + + // we should have only unsubscribed once + assertEquals(1, counter.get()); + } + + @Test(timeout = 1000) + public void testUnsubscribeIdempotenceConcurrently() + throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + final SubscriptionList s = new SubscriptionList(); + + final int count = 10; + final CountDownLatch start = new CountDownLatch(1); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + + @Override + public boolean isUnsubscribed() { + return false; + } + }); + + final List threads = new ArrayList(); + for (int i = 0; i < count; i++) { + final Thread t = new Thread() { + @Override + public void run() { + try { + start.await(); + s.unsubscribe(); + } catch (final InterruptedException e) { + fail(e.getMessage()); + } + } + }; + t.start(); + threads.add(t); + } + + start.countDown(); + for (final Thread t : threads) { + t.join(); + } + + // we should have only unsubscribed once + assertEquals(1, counter.get()); + } +}