diff --git a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java index 873d3d1a7f..ff7137447d 100644 --- a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java +++ b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java @@ -52,7 +52,7 @@ public void addActivePlan(ActivePlan0 activePlan) { @Override public void subscribe(Object gate) { this.gate = gate; - subscription.set(source.materialize().subscribe(this)); + subscription.setSubscription(source.materialize().subscribe(this)); } @Override diff --git a/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java b/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java index ef33ebd3d3..250ac6e3ef 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; +import rx.subscriptions.SingleAssignmentSubscription; /** * Thread-safe wrapper around Observable Subscription that ensures unsubscribe can be called only once. @@ -30,21 +31,14 @@ * */ public final class SafeObservableSubscription implements Subscription { - - private static final Subscription UNSUBSCRIBED = new Subscription() - { - @Override - public void unsubscribe() - { - } - }; - private final AtomicReference actualSubscription = new AtomicReference(); + private final SingleAssignmentSubscription sas; public SafeObservableSubscription() { + sas = new SingleAssignmentSubscription(); } public SafeObservableSubscription(Subscription actualSubscription) { - this.actualSubscription.set(actualSubscription); + sas = new SingleAssignmentSubscription(actualSubscription); } /** @@ -56,27 +50,16 @@ public SafeObservableSubscription(Subscription actualSubscription) { * if trying to set more than once (or use this method after setting via constructor) */ public SafeObservableSubscription wrap(Subscription actualSubscription) { - if (!this.actualSubscription.compareAndSet(null, actualSubscription)) { - if (this.actualSubscription.get() == UNSUBSCRIBED) { - actualSubscription.unsubscribe(); - return this; - } - throw new IllegalStateException("Can not set subscription more than once."); - } + sas.setSubscription(actualSubscription); return this; } @Override public void unsubscribe() { - // get the real thing and set to null in an atomic operation so we will only ever call unsubscribe once - Subscription actual = actualSubscription.getAndSet(UNSUBSCRIBED); - // if it's not null we will unsubscribe - if (actual != null) { - actual.unsubscribe(); - } + sas.unsubscribe(); } public boolean isUnsubscribed() { - return actualSubscription.get() == UNSUBSCRIBED; + return sas.isUnsubscribed(); } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/AbstractAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/AbstractAssignmentSubscription.java new file mode 100644 index 0000000000..44469b01ee --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/AbstractAssignmentSubscription.java @@ -0,0 +1,95 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicReference; + +import rx.Subscription; + +/** + * Base class to manage a reference to another subscription in atomic manner + * and allows callbacks to handle the referenced subscription at the + * pre-swap and post-swap stages. + * + */ +public abstract class AbstractAssignmentSubscription implements Subscription { + /** The subscription holding the reference. */ + protected final AtomicReference reference = new AtomicReference(); + /** Sentinel for the unsubscribed state. */ + private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { + @Override + public void unsubscribe() { + } + }; + /** Creates an empty AbstractAssignmentSubscription. */ + public AbstractAssignmentSubscription() { + + } + /** + * Creates a AbstractAssignmentSubscription with the given subscription + * as its initial value. + * + * @param s the initial subscription + */ + public AbstractAssignmentSubscription(Subscription s) { + this(); + reference.set(s); + } + public boolean isUnsubscribed() { + return reference.get() == UNSUBSCRIBED_SENTINEL; + } + + @Override + public void unsubscribe() { + Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); + if (s != null) { + s.unsubscribe(); + } + } + + public void setSubscription(Subscription s) { + do { + Subscription r = reference.get(); + if (r == UNSUBSCRIBED_SENTINEL) { + s.unsubscribe(); + return; + } + onPreSwap(r); + if (reference.compareAndSet(r, s)) { + onPostSwap(r); + break; + } + } while (true); + } + /** + * Override this method to perform logic on a subscription before + * an attempt is tried to swap it for a new subscription. + * @param current the current subscription value + */ + protected void onPreSwap(Subscription current) { } + /** + * Override this method to perform actions once a subscription has been + * swapped to a new one. + * @param old the old subscription value + */ + protected void onPostSwap(Subscription old) { } + + public Subscription getSubscription() { + Subscription s = reference.get(); + return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty(); + } + +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/AbstractAtomicSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/AbstractAtomicSubscription.java new file mode 100644 index 0000000000..8fcda32be2 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/AbstractAtomicSubscription.java @@ -0,0 +1,157 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Actions; +import rx.util.functions.Func0; + +/** + * Base class for subscriptions with lock-free behavior. + */ +public abstract class AbstractAtomicSubscription implements Subscription { + /** + * The subscription state for tracking the mutating and unsubscribed states. + */ + protected enum SubscriptionState { + /** Indicates that the subscription may be mutated. */ + ACTIVE, + /** Indicates that a mutation is going on. */ + MUTATING, + /** Indicates that the subscription has been unsubscribed and no further mutation may happen. */ + UNSUBSCRIBED + } + /** The current state. */ + private final AtomicReference state = new AtomicReference(SubscriptionState.ACTIVE); + /** + * Atomically sets the state. + * @param newState the new state + */ + protected final void setState(SubscriptionState newState) { + if (newState == null) { + throw new NullPointerException("newState"); + } + state.set(newState); + } + /** + * Atomically retrieves the current state. + * @return the current state. + */ + protected final SubscriptionState getState() { + return state.get(); + } + /** + * Executes the given action while in the MUTATING state and transitions to the supplied new state. + *

+ * Even if the {@code action} throws an exception, the state is always set to the {@code newState}. + * @param newState the state to set after the action was called + * @param action the action to execute while in the MUTATING state + * @return true if the action was called, false if the subscription was unsubscribed + */ + protected final boolean callAndSet(SubscriptionState newState, Action0 action) { + return callAndSet(newState, action, null); + } + /** + * Executes the given action and sets the state to the supplied newState or + * executes the function and sets the state to its return value. + * + * @param newState the default new state, set if action is executed or the func call throws. + * @param action the action to execute, null if not applicable + * @param func the function to execute, null if not applicable + * @return true if either the action or function was executed, + * false if the subscription was unsubscribed during the operation + */ + private boolean callAndSet(SubscriptionState newState, Action0 action, Func0 func) { + if (newState == null) { + throw new NullPointerException("newState"); + } + if (action == null && func == null) { + throw new IllegalArgumentException("action & func both null!"); + } + if (action != null && func != null) { + throw new IllegalArgumentException("action & func both non-null!"); + } + do { + SubscriptionState s = state.get(); + if (s == SubscriptionState.UNSUBSCRIBED) { + return false; + } + if (s == SubscriptionState.MUTATING) { + continue; + } + if (state.compareAndSet(s, SubscriptionState.MUTATING)) { + SubscriptionState toSet = newState; + try { + if (action != null) { + action.call(); + } else { + toSet = func.call(); + } + } finally { + state.set(toSet); + } + return true; + } + } while (true); + } + /** + * Executes the given function while in the MUTATING state and transitions + * to state returned by the function. + *

+ * If the func throws, the state is reset to ACTIVE and the exception is propagated. + * + * @param func the function to call, should return the state after the function call + * @return true if the action was called, false if the subscription was unsubscribed + */ + protected final boolean call(Func0 func) { + return callAndSet(SubscriptionState.ACTIVE, null, func); + } + /** + * Transitions to the supplied new state and executes the given action. + *

+ * The action is responsible to + * @param newState the state to set before the action is called + * @param action the action to execute while in the MUTATING state + * @return true if the action was called, false if the subscription was unsubscribed + */ + protected final boolean setAndCall(SubscriptionState newState, Action0 action) { + if (action == null) { + throw new NullPointerException("action"); + } + if (setAndCall(newState, Actions.empty0())) { + action.call(); + return true; + } + return false; + } + /** + * Executes the given action and returns in the ACTIVE state. + * @param action the action to execute while in the MUTATING state + * @return true if the action was called, false if the subscription was unsubscribed + */ + protected final boolean call(Action0 action) { + return callAndSet(SubscriptionState.ACTIVE, action); + } + /** + * Returns true if this subscription has been unsubscribed. + * @return true if this subscription has been unsubscribed + */ + public final boolean isUnsubscribed() { + return state.get() == SubscriptionState.UNSUBSCRIBED; + } +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java index 7b18b35efc..52491ce064 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -19,23 +19,48 @@ import rx.Observable; import rx.Subscription; +import rx.util.functions.Action0; /** * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed. * - * @see Rx.Net equivalent BooleanDisposable + * @see Rx.Net equivalent BooleanDisposable */ public class BooleanSubscription implements Subscription { - - private final AtomicBoolean unsubscribed = new AtomicBoolean(false); + /** The subscription state. */ + private final AtomicBoolean unsubscribed = new AtomicBoolean(); public boolean isUnsubscribed() { return unsubscribed.get(); } - + /** + * Override this method to perform any action once if this BooleanSubscription + * is unsubscribed. + */ + protected void onUnsubscribe() { } + @Override public void unsubscribe() { - unsubscribed.set(true); + if (unsubscribed.compareAndSet(false, true)) { + onUnsubscribe(); + } + } + /** + * Returns a BooleanSubscription which calls the given action once + * it is unsubscribed. + * @param action the action to call when unsubscribing + * @return the BooleanSubscription which calls the given action once + * it is unsubscribed + */ + public static BooleanSubscription withAction(final Action0 action) { + if (action == null) { + throw new NullPointerException("action"); + } + return new BooleanSubscription() { + @Override + protected void onUnsubscribe() { + action.call(); + } + }; } - } diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index c19331c8ec..e05aa48df1 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -15,146 +15,91 @@ */ package rx.subscriptions; -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableSet; - import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; -import rx.util.CompositeException; +import rx.util.functions.Action0; /** * Subscription that represents a group of Subscriptions that are unsubscribed * together. * * @see Rx.Net + * href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.aspx">Rx.Net * equivalent CompositeDisposable */ -public class CompositeSubscription implements Subscription { - /** Sentinel to indicate a thread is modifying the subscription set. */ - private static final Set MUTATE_SENTINEL = unmodifiableSet(Collections.emptySet()); - /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/ - private static final Set UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.emptySet()); - /** The reference to the set of subscriptions. */ - private final AtomicReference> reference = new AtomicReference>(); - +public class CompositeSubscription extends AbstractAtomicSubscription { + /** The tracked subscriptions. */ + protected final Set subscriptions = new LinkedHashSet(); public CompositeSubscription(final Subscription... subscriptions) { - reference.set(new HashSet(asList(subscriptions))); - } - - public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + this.subscriptions.addAll(Arrays.asList(subscriptions)); } public void add(final Subscription s) { - do { - final Set existing = reference.get(); - if (existing == UNSUBSCRIBED_SENTINEL) { - s.unsubscribe(); - break; - } - - if (existing == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(existing, MUTATE_SENTINEL)) { - existing.add(s); - reference.set(existing); - break; - } - } while (true); + Add a = new Add(s); + if (!call(a) && s != null) { + s.unsubscribe(); + } } public void remove(final Subscription s) { - do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - s.unsubscribe(); - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { - // also unsubscribe from it: - // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx - subscriptions.remove(s); - reference.set(subscriptions); - s.unsubscribe(); - break; - } - } while (true); + Remove r = new Remove(s); + call(r); + if (s != null) { + s.unsubscribe(); + } } public void clear() { - do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { - final Set copy = new HashSet( - subscriptions); - subscriptions.clear(); - reference.set(subscriptions); - - unsubscribeAll(copy); - break; - } - } while (true); - } - /** - * Unsubscribe from the collection of subscriptions. - *

- * Exceptions thrown by any of the {@code unsubscribe()} methods are - * collected into a {@link CompositeException} and thrown once - * all unsubscriptions have been attempted. - * @param subs the collection of subscriptions - */ - private void unsubscribeAll(Collection subs) { - final Collection es = new ArrayList(); - for (final Subscription s : subs) { - try { - s.unsubscribe(); - } catch (final Throwable e) { - es.add(e); - } - } - if (!es.isEmpty()) { - throw new CompositeException( - "Failed to unsubscribe to 1 or more subscriptions.", es); + Clear c = new Clear(); + if (call(c)) { + Subscriptions.unsubscribeAll(c.list); } } + @Override public void unsubscribe() { - do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) { - unsubscribeAll(subscriptions); - break; - } - } while (true); + Clear c = new Clear(); + if (callAndSet(SubscriptionState.UNSUBSCRIBED, c)) { + Subscriptions.unsubscribeAll(c.list); + } + } + /** Add a subscription. */ + private final class Add implements Action0 { + final Subscription s; + public Add(Subscription s) { + this.s = s; + } + @Override + public void call() { + subscriptions.add(s); + } + } + /** Remove a subscription if present. */ + private final class Remove implements Action0 { + final Subscription s; + boolean found; + public Remove(Subscription s) { + this.s = s; + } + @Override + public void call() { + found = subscriptions.remove(s); + } + } + /** + * Clears and returns the subscriptions from this composite. + */ + private final class Clear implements Action0 { + List list; + @Override + public void call() { + list = new ArrayList(subscriptions); + subscriptions.clear(); + } } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 8fed35fbbf..e6b7b55700 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -15,7 +15,6 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import rx.Observable; @@ -26,42 +25,6 @@ * * @see Rx.Net equivalent MultipleAssignmentDisposable */ -public class MultipleAssignmentSubscription implements Subscription { - private AtomicReference reference = new AtomicReference(); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { - } - }; - public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; - } - - @Override - public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); - if (s != null) { - s.unsubscribe(); - } - } - - public void setSubscription(Subscription s) { - do { - Subscription r = reference.get(); - if (r == UNSUBSCRIBED_SENTINEL) { - s.unsubscribe(); - return; - } - if (reference.compareAndSet(r, s)) { - break; - } - } while (true); - } - - public Subscription getSubscription() { - Subscription s = reference.get(); - return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty(); - } +public class MultipleAssignmentSubscription extends AbstractAssignmentSubscription { } diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index a4747caa9b..a3e5d12c2b 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -15,10 +15,9 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; /** * Keeps track of the sub-subscriptions and unsubscribes the underlying @@ -26,21 +25,13 @@ * * @see MSDN RefCountDisposable */ -public class RefCountSubscription implements Subscription { - /** The state for the atomic operations. */ - private enum State { - ACTIVE, - MUTATING, - UNSUBSCRIBED - } +public class RefCountSubscription extends AbstractAtomicSubscription { /** The reference to the actual subscription. */ private volatile Subscription main; - /** The current state. */ - private final AtomicReference state = new AtomicReference(); /** Counts the number of sub-subscriptions. */ - private final AtomicInteger count = new AtomicInteger(); + private int count; /** Indicate the request to unsubscribe from the main. */ - private final AtomicBoolean mainDone = new AtomicBoolean(); + private boolean mainDone; /** * Create a RefCountSubscription by wrapping the given non-null Subscription. * @param s @@ -53,87 +44,88 @@ public RefCountSubscription(Subscription s) { } /** * Returns a new sub-subscription. + * @return a new sub-subscription. */ public Subscription getSubscription() { - do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { - return Subscriptions.empty(); - } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - count.incrementAndGet(); - state.set(State.ACTIVE); - return new InnerSubscription(); - } - } while(true); - } - /** - * Check if this subscription is already unsubscribed. - */ - public boolean isUnsubscribed() { - return state.get() == State.UNSUBSCRIBED; + PlusPlusCount ppc = new PlusPlusCount(); + if (call(ppc)) { + return new InnerSubscription(); + } + return Subscriptions.empty(); } + @Override public void unsubscribe() { - do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { - return; + MainDone md = new MainDone(); + if (call(md)) { + if (md.runTerminate) { + terminate(); } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - if (mainDone.compareAndSet(false, true) && count.get() == 0) { - terminate(); - return; - } - state.set(State.ACTIVE); - break; - } - } while (true); + } } /** * Terminate this subscription by unsubscribing from main and setting the * state to UNSUBSCRIBED. */ private void terminate() { - state.set(State.UNSUBSCRIBED); Subscription r = main; main = null; r.unsubscribe(); } + /** Remove an inner subscription. */ void innerDone() { - do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { - return; + InnerDone id = new InnerDone(); + if (call(id)) { + if (id.runTerminate) { + terminate(); } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - if (count.decrementAndGet() == 0 && mainDone.get()) { - terminate(); - return; + } + } + /** The individual sub-subscriptions. */ + private final class InnerSubscription extends BooleanSubscription { + @Override + protected void onUnsubscribe() { + innerDone(); + } + }; + /** + * Execute a {@code ++count}. + */ + private final class PlusPlusCount implements Action0 { + @Override + public void call() { + ++count; + } + } + /** Called from the main unsubscribe(). */ + private final class MainDone implements Func0 { + boolean runTerminate; + @Override + public SubscriptionState call() { + if (!mainDone) { + mainDone = true; + if (count == 0) { + runTerminate = true; + return SubscriptionState.UNSUBSCRIBED; } - state.set(State.ACTIVE); - break; } - } while (true); + return SubscriptionState.ACTIVE; + } } - /** The individual sub-subscriptions. */ - class InnerSubscription implements Subscription { - final AtomicBoolean innerDone = new AtomicBoolean(); + + /** Called from the main unsubscribe(). */ + private final class InnerDone implements Func0 { + boolean runTerminate; @Override - public void unsubscribe() { - if (innerDone.compareAndSet(false, true)) { - innerDone(); + public SubscriptionState call() { + if (--count == 0 && mainDone) { + if (count == 0) { + runTerminate = true; + return SubscriptionState.UNSUBSCRIBED; + } } + return SubscriptionState.ACTIVE; } - }; + } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index e26a258acc..76442e4a55 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -15,9 +15,6 @@ */ package rx.subscriptions; -import static rx.subscriptions.Subscriptions.empty; - -import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -27,41 +24,25 @@ * * @see Rx.Net equivalent SerialDisposable */ -public class SerialSubscription implements Subscription { - private final AtomicReference reference = new AtomicReference(empty()); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { - } - }; - public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; +public class SerialSubscription extends AbstractAssignmentSubscription { + /** Creates an empty SerialSubscription. */ + public SerialSubscription() { + super(); + } + /** + * Creates a SerialSubscription with the given subscription + * as its initial value. + * + * @param s the initial subscription + */ + public SerialSubscription(Subscription s) { + super(s); } @Override - public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); - if (s != null) { - s.unsubscribe(); + protected void onPostSwap(Subscription old) { + if (old != null) { + old.unsubscribe(); } } - - public void setSubscription(final Subscription subscription) { - do { - final Subscription current = reference.get(); - if (current == UNSUBSCRIBED_SENTINEL) { - subscription.unsubscribe(); - break; - } - if (reference.compareAndSet(current, subscription)) { - current.unsubscribe(); - break; - } - } while (true); - } - public Subscription getSubscription() { - final Subscription subscription = reference.get(); - return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription; - } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java index c960db2ea4..cd0322c695 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java @@ -16,66 +16,44 @@ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; /** - * A subscription that allows only a single resource to be assigned. + * A subscription that allows only a single subscription to be assigned. *

- * If this subscription is live, no other subscription may be set() and + * If this subscription is live, no other subscription may be set and * yields an {@link IllegalStateException}. *

* If the unsubscribe has been called, setting a new subscription will * unsubscribe it immediately. */ -public final class SingleAssignmentSubscription implements Subscription { - /** Holds the current resource. */ - private final AtomicReference current = new AtomicReference(); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { - } - }; - /** - * Returns the current subscription or null if not yet set. - */ - public Subscription get() { - Subscription s = current.get(); - if (s == UNSUBSCRIBED_SENTINEL) { - return Subscriptions.empty(); - } - return s; +public final class SingleAssignmentSubscription extends AbstractAssignmentSubscription { + /** Creates an empty SingleAssignmentSubscription. */ + public SingleAssignmentSubscription() { + super(); } /** - * Sets a new subscription if not already set. - * @param s the new subscription - * @throws IllegalStateException if this subscription is live and contains - * another subscription. + * Creates a SerialSubscription with the given subscription + * as its initial value. + * + * @param s the initial subscription */ - public void set(Subscription s) { - if (current.compareAndSet(null, s)) { - return; - } - if (current.get() != UNSUBSCRIBED_SENTINEL) { - throw new IllegalStateException("Subscription already set"); - } - if (s != null) { - s.unsubscribe(); - } + public SingleAssignmentSubscription(Subscription s) { + super(s); } @Override - public void unsubscribe() { - Subscription old = current.getAndSet(UNSUBSCRIBED_SENTINEL); - if (old != null) { - old.unsubscribe(); + protected void onPreSwap(Subscription current) { + if (current != null) { + throw new IllegalStateException("Can not set subscription more than once!"); } } /** - * Test if this subscription is already unsubscribed. + * Sets the subscription if not already set. + * + * @param s + * @deprecated use the common {@link #setSubscription(rx.Subscription)} method instead. */ - public boolean isUnsubscribed() { - return current.get() == UNSUBSCRIBED_SENTINEL; + public void set(Subscription s) { + setSubscription(s); } - } diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index 61fd6b3f6c..bf8cb0531b 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -15,10 +15,13 @@ */ package rx.subscriptions; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.Future; import rx.Subscription; import rx.operators.SafeObservableSubscription; +import rx.util.CompositeException; import rx.util.functions.Action0; /** @@ -123,4 +126,26 @@ public static CompositeSubscription create(Subscription... subscriptions) { public void unsubscribe() { } }; + /** + * Unsubscribe from the sequence of subscriptions. + *

+ * Exceptions thrown by any of the {@code unsubscribe()} methods are + * collected into a {@link CompositeException} and thrown once + * all unsubscriptions have been attempted. + * @param subs the collection of subscriptions + */ + public static void unsubscribeAll(Iterable subs) { + final Collection es = new ArrayList(); + for (final Subscription s : subs) { + try { + s.unsubscribe(); + } catch (final Throwable e) { + es.add(e); + } + } + if (!es.isEmpty()) { + throw new CompositeException( + "Failed to unsubscribe to 1 or more subscriptions.", es); + } + } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/ValuedCompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/ValuedCompositeSubscription.java new file mode 100644 index 0000000000..37b36aac7c --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/ValuedCompositeSubscription.java @@ -0,0 +1,610 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import rx.Subscription; +import rx.subscriptions.AbstractAtomicSubscription.SubscriptionState; +import rx.util.functions.Action0; + +/** + * A composite subscription which contains other subscriptions with an associated + * value. + *

+ * This composite doesn't allow {@code null} {@link rx.Subscription}s as keys but + * permits {@code null} as values. + *

+ * The composite retains the order of the added/put key-value pairs. + *

+ * The {@code unsubscribe()} method unsubscribes in FIFO order. + * + * @param the value type + */ +public class ValuedCompositeSubscription extends AbstractAtomicSubscription { + /** The map holding the subscriptions and their associated value. */ + private final Map map = new LinkedHashMap(); + /** + * Constructs an empty ValuedCompositeSubscription. + */ + public ValuedCompositeSubscription() { + super(); + } + /** + * Constructs a ValuedCompositeSubscription with the given initial + * key and value. + * @param key the initial key + * @param value the initial value + */ + public ValuedCompositeSubscription(Subscription key, T value) { + this(); + if (key == null) { + throw new NullPointerException("key"); + } + map.put(key, value); + } + /** + * Add or replace a value associated with the given subscription key. + * @param key the key to add or replace + * @param value the value to add or replace with + * @return this instance to allow method chaining. + */ + public ValuedCompositeSubscription add(Subscription key, T value) { + if (key == null) { + throw new NullPointerException("key"); + } + AddEntry ae = new AddEntry(key, value); + if (!call(ae)) { + key.unsubscribe(); + } + return this; + } + /** + * Adds or replaces a value associated with the given subscription key + * and returns the previous value if any. + * @param key the key to add or replace + * @param value the value to add or replace with + * @return the previous value associated with the key or {@code null} if + *

    + *
  • there was no pervious key
  • + *
  • this composite was unsubscribed during the operation.
  • + *
+ */ + public T put(Subscription key, T value) { + if (key == null) { + throw new NullPointerException("key"); + } + AddEntry ae = new AddEntry(key, value); + if (!call(ae)) { + key.unsubscribe(); + } + return ae.previous; + } + /** + * Try adding/replacing a key-value pair to this composite. + * @param key the key to add + * @param value the value to add + * @return true if the key-value pair was added, false if this + * composite was unsubscribed during the operation + */ + public boolean tryAdd(Subscription key, T value) { + if (key == null) { + throw new NullPointerException("key"); + } + AddEntry ae = new AddEntry(key, value); + return call(ae); + } + /** + * Removes and unsubscribes the given subscription key from this composite. + * @param key the key to remove + * @return the associated value of the key or {@code null} if + *
    + *
  • the {@code key} is {@code null},
  • + *
  • there was no pervious key or
  • + *
  • this composite was unsubscribed during the operation.
  • + *
+ */ + public T remove(Subscription key) { + if (key == null) { + return null; + } + RemoveEntry re = new RemoveEntry(key); + if (call(re) && re.result) { + key.unsubscribe(); + } + return re.value; + } + /** + * Deletes (removes) a subscription from this composite but does not + * call {@code unsubscribe()} on it. + * @param key the key to remove without unsubscribing + * @return The associated value of the key or {@code null} if + *
    + *
  • the {@code key} is {@code null},
  • + *
  • there was no pervious key or
  • + *
  • this composite was unsubscribed during the operation.
  • + *
+ */ + public T delete(Subscription key) { + if (key == null) { + return null; + } + DeleteEntry de = new DeleteEntry(key); + call(de); + return de.value; + } + /** + * Adds a new key-value pair to this composite only if the key is + * not already present. + * @param key the subscription key to add + * @param value the value to add + * @return true if the key-value pair was added, false if + *
    + *
  • the {@code key} was already present or
  • + *
  • this composite was unsubscribed unsubscribed during the operation.
  • + *
+ */ + public boolean putIfAbsent(Subscription key, T value) { + if (key == null) { + throw new NullPointerException("key"); + } + AddIfEntry aie = new AddIfEntry(key, value); + if (!call(aie)) { + key.unsubscribe(); + } + return aie.result; + } + /** + * Check if the composite is empty. + * @return true if the composite is empty + */ + public boolean isEmpty() { + return size() == 0; + } + /** + * Returns the number of key-value pairs in this composite. + * @return the number of key-value pairs in this composite + */ + public int size() { + Size s = new Size(); + call(s); + return s.result; + } + /** + * Atomically adds or replaces key-value pairs in this composite. + *

+ * Copies the {@code map} contents before modifying the composite, therefore, + * a {@code null} key in {@code map} will yield a {@link NullPointerException} + * and the composite remains unchanged. + * @param map the map of subscription key and value pairs + */ + public void putAll(Map map) { + PutAll pa = new PutAll(map); + if (!call(pa)) { + Subscriptions.unsubscribeAll(pa.mapCopy.keySet()); + } + } + /** + * Checks if the given subscription key is in this composite. + * @param key the key to check + * @return true if in this composite, false if + *

    + *
  • the {@code key} is {@code null}.
  • + *
  • the {@code key} is not in the composite or
  • + *
  • this composite was unsubscribed during the operation.
  • + *
+ */ + public boolean contains(Subscription key) { + if (key == null) { + return false; + } + Contains c = new Contains(key); + call(c); + return c.result; + } + /** + * Clears and unsubscribes all subscriptions maintained by this composite. + *

+ * It unsubscribes in FIFO order. + * @see #deleteClear() + */ + public void clear() { + ListAndCleanup c = new ListAndCleanup(); + if (call(c)) { + Subscriptions.unsubscribeAll(c.list); + } + } + /** + * Clears but does not unsubscribe all subscriptions maintained by this composite. + * @see #clear() + */ + public void deleteClear() { + ListAndCleanup c = new ListAndCleanup(); + call(c); + } + /** + * Atomically removes and unsubscribes any subscription which are both present + * in this composite and in source. + * @param source the source sequence of subscriptions to remove + */ + public void removeAll(Iterable source) { + RemoveAll ra = new RemoveAll(source); + if (call(ra)) { + Subscriptions.unsubscribeAll(ra.copy); + } + } + /** + * Atomically removes any subscription which are both present + * in this composite and in source but does not unsubscribe them. + * @param source the source sequence of subscriptions to remove + */ + public void deleteAll(Iterable source) { + DeleteAll da = new DeleteAll(source); + call(da); + } + /** + * Returns an unmodifiable collection of values maintained by this composite. + * @return an unmodifiable collection of values + */ + public Collection values() { + Values v = new Values(); + if (call(v)) { + return Collections.unmodifiableCollection(v.result); + } + return Collections.emptyList(); + } + @Override + public void unsubscribe() { + ListAndCleanup c = new ListAndCleanup(); + if (callAndSet(SubscriptionState.UNSUBSCRIBED, c)) { + Subscriptions.unsubscribeAll(c.list); + } + } + /** + * Marks this composite as unsubscribed, clears the maintained subscriptions + * but does not unsubscribe them. + */ + public void deleteUnsubscribe() { + ListAndCleanup c = new ListAndCleanup(); + callAndSet(SubscriptionState.UNSUBSCRIBED, c); + } + /** + * Atomically retrieve the maintained subscription keys and mark this composite + * as unsubscribed. + *

+ * The returned subscriptions are not unsubscribed. + * @return the collection of keys + */ + public Collection getKeysAndUnsubscribe() { + ListAndCleanup c = new ListAndCleanup(); + if (callAndSet(SubscriptionState.UNSUBSCRIBED, c)) { + return Collections.unmodifiableList(c.list); + } + return Collections.emptyList(); + } + /** + * Atomically retrieve the maintained subscription keys and values and mark this composite + * as unsubscribed. + *

+ * The returned subscription keys are not unsubscribed. + * @return the collection of keys + */ + public Map getEntriesAndUnsubscribe() { + MapAndCleanup c = new MapAndCleanup(); + if (callAndSet(SubscriptionState.UNSUBSCRIBED, c)) { + return Collections.unmodifiableMap(c.mapCopy); + } + return Collections.emptyMap(); + } + /** + * Atomically retrieve the maintained values and mark this composite + * as unsubscribed. + *

+ * The cleared subscription keys are not unsubscribed. + * @return the collection of values + */ + public Collection getValuesAndUnsubscribe() { + ValueAndCleanup c = new ValueAndCleanup(); + if (callAndSet(SubscriptionState.UNSUBSCRIBED, c)) { + return Collections.unmodifiableList(c.values); + } + return Collections.emptyList(); + } + /** + * Return a value associated with the key. + * @param key the key to look for + * @return the associated value or null if + *

    + *
  • the key is not present or
  • + *
  • the composite was unsubscribed during the operation.
  • + *
+ */ + public T get(Subscription key) { + Get g = new Get(key); + call(g); + return g.result; + } + /** + * Return a value associated with the key. + * @param key the key to look for + * @param defaultValue the default value if the key is missing or the composite + * was unsubscribed during the operation + * @return the associated value or the default value if + *
    + *
  • the key is not present or
  • + *
  • the composite was unsubscribed during the operation.
  • + *
+ */ + public T getOrDefault(Subscription key, T defaultValue) { + Get g = new Get(key); + call(g); + return g.present ? g.result : defaultValue; + + } + /** + * Atomically retrieve the maintained values, unsubscribe their keys and mark this composite + * as unsubscribed. + *

+ * The cleared subscription keys are not unsubscribed. + * @return the collection of values + */ + public Collection unsubscribeAndGet() { + Map m = getEntriesAndUnsubscribe(); + Subscriptions.unsubscribeAll(m.keySet()); + return Collections.unmodifiableList(new ArrayList(m.values())); + } + /** + * Action to check if a key is in the map. + */ + private final class Contains implements Action0 { + final Subscription key; + boolean result; + public Contains(Subscription key) { + this.key = key; + } + @Override + public void call() { + result = map.containsKey(key); + } + } + /** + * Retrieves the size of the map. + */ + private final class Size implements Action0 { + int result; + @Override + public void call() { + result = map.size(); + } + } + /** + * Action that adds/replaces a value for the given subscription key. + */ + private final class AddEntry implements Action0 { + final Subscription key; + final T value; + T previous; + public AddEntry(Subscription key, T value) { + this.key = key; + this.value = value; + } + + @Override + public void call() { + previous = map.put(key, value); + } + } + /** + * Action that adds a value for the given subscription key only if the key + * is not already in the map. + */ + private final class AddIfEntry implements Action0 { + final Subscription key; + final T value; + boolean result; + public AddIfEntry(Subscription key, T value) { + this.key = key; + this.value = value; + } + + @Override + public void call() { + if (!map.containsKey(key)) { + map.put(key, value); + result = true; + } else { + result = false; + } + } + } + /** + * Action that removes an entry and unsubscribes the key. + *

+ * If the given key is not in the map no unsubscription will happen. + */ + private final class RemoveEntry implements Action0 { + final Subscription key; + boolean result; + T value; + public RemoveEntry(Subscription key) { + this.key = key; + } + + @Override + public void call() { + if (map.containsKey(key)) { + value = map.remove(key); + result = true; + } + } + } + /** + * Action that removes an entry but does not unsubscribe it. + */ + private final class DeleteEntry implements Action0 { + final Subscription key; + T value; + public DeleteEntry(Subscription key) { + this.key = key; + } + + @Override + public void call() { + value = map.remove(key); + } + } + /** + * Clear the map and return the subscription keys. + */ + private final class ListAndCleanup implements Action0 { + List list; + @Override + public void call() { + list = new ArrayList(map.keySet()); + map.clear(); + } + } + /** + * Clear the map and return the key-value pairs. + */ + private final class MapAndCleanup implements Action0 { + Map mapCopy; + @Override + public void call() { + mapCopy = new LinkedHashMap(map); + map.clear(); + } + } + /** + * Clear the map and return the values. + */ + private final class ValueAndCleanup implements Action0 { + List values; + @Override + public void call() { + values = new ArrayList(map.values()); + map.clear(); + } + } + /** + * Adds/replaces subscription keys and values. + */ + private final class PutAll implements Action0 { + final Map mapCopy; + public PutAll(Map map) { + this.mapCopy = new LinkedHashMap(); + for (Map.Entry e : map.entrySet()) { + Subscription k = e.getKey(); + if (k == null) { + throw new NullPointerException(); + } + this.mapCopy.put(k, e.getValue()); + } + } + + @Override + public void call() { + map.putAll(mapCopy); + } + } + /** + * Remove and unsubscribe the common subscriptions. + */ + private final class RemoveAll implements Action0 { + final List copy; + public RemoveAll(Iterable source) { + this.copy = new LinkedList(); + for (Subscription s : source) { + if (s != null) { + copy.add(s); + } + } + } + + @Override + public void call() { + Iterator it = copy.iterator(); + while (it.hasNext()) { + Subscription s = it.next(); + if (map.containsKey(s)) { + map.remove(s); + } else { + it.remove(); + } + } + } + } + /** + * Remove and unsubscribe the common subscriptions. + */ + private final class DeleteAll implements Action0 { + final List copy; + public DeleteAll(Iterable source) { + this.copy = new LinkedList(); + for (Subscription s : source) { + if (s != null) { + copy.add(s); + } + } + } + + @Override + public void call() { + Iterator it = copy.iterator(); + while (it.hasNext()) { + Subscription s = it.next(); + if (map.containsKey(s)) { + map.remove(s); + } + } + } + } + /** + * Returns the values of the map. + */ + private final class Values implements Action0 { + Collection result; + + @Override + public void call() { + result = new ArrayList(map.values()); + } + } + /** + * Get the value of the key. + */ + private final class Get implements Action0 { + final Subscription key; + T result; + boolean present; + public Get(Subscription key) { + this.key = key; + } + @Override + public void call() { + present = map.containsKey(key); + if (present) { + result = map.get(key); + } + } + } +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java index 7a33d451ce..4ef19f0d69 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Actions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -354,4 +354,45 @@ public R call(Object... args) { } }; } + /** A no-op Action0. */ + private static final Action0 EMPTY0 = new Action0() { + @Override + public void call() { + } + }; + /** + * Returns a no-op Action0 instance. + * @return a no-op Action0 instance + */ + public static Action0 empty0() { + return EMPTY0; + } + /** A no-op Action1. */ + private static final Action1 EMPTY1 = new Action1() { + @Override + public void call(Object t1) { + } + }; + /** + * Returns a no-op Action1 instance. + * @return a no-op Action1 instance + */ + @SuppressWarnings("unchecked") + public static Action1 empty1() { + return (Action1)EMPTY1; + } + /** A no-op Action1. */ + private static final Action2 EMPTY2 = new Action2() { + @Override + public void call(Object t1, Object t2) { + } + }; + /** + * Returns a no-op Action2 instance. + * @return a no-op Action2 instance + */ + @SuppressWarnings("unchecked") + public static Action2 empty2() { + return (Action2)EMPTY2; + } } diff --git a/rxjava-core/src/test/java/rx/subscriptions/SingleAssignmentSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/SingleAssignmentSubscriptionTest.java new file mode 100644 index 0000000000..d95f355924 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/SingleAssignmentSubscriptionTest.java @@ -0,0 +1,67 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SingleAssignmentSubscriptionTest { + SingleAssignmentSubscription sas; + @Before + public void before() { + sas = new SingleAssignmentSubscription(); + + Assert.assertEquals(false, sas.isUnsubscribed()); + } + @Test(expected = IllegalStateException.class) + public void setTwice() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + sas.setSubscription(s1); + + sas.setSubscription(s2); + } + @Test + public void setToAnUnsubscribed() { + BooleanSubscription s1 = new BooleanSubscription(); + + sas.unsubscribe(); + + Assert.assertEquals(true, sas.isUnsubscribed()); + + sas.setSubscription(s1); + + Assert.assertEquals(true, s1.isUnsubscribed()); + } + @Test + public void setUnsubscribeSet() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + sas.setSubscription(s1); + + sas.unsubscribe(); + + sas.setSubscription(s2); + + Assert.assertEquals(true, sas.isUnsubscribed()); + Assert.assertEquals(true, s1.isUnsubscribed()); + Assert.assertEquals(true, s2.isUnsubscribed()); + + } +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/ValuedCompositeSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/ValuedCompositeSubscriptionTest.java new file mode 100644 index 0000000000..d92ad33fd9 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/ValuedCompositeSubscriptionTest.java @@ -0,0 +1,289 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import rx.Subscription; + +public class ValuedCompositeSubscriptionTest { + ValuedCompositeSubscription vcs; + @Before + public void before() { + vcs = new ValuedCompositeSubscription(); + + Assert.assertEquals(false, vcs.isUnsubscribed()); + } + @Test + public void testAddAndTerminate() { + + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + Assert.assertEquals(2, vcs.size()); + + vcs.unsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + + Assert.assertEquals(true, s1.isUnsubscribed()); + Assert.assertEquals(true, s2.isUnsubscribed()); + } + @Test + public void testUnsubscribeTwice() { + BooleanSubscription s1 = new BooleanSubscription(); + vcs.add(s1, 1); + + vcs.unsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + + vcs.unsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + } + @Test + public void testAddAfterTerminate() { + vcs.unsubscribe(); + + BooleanSubscription s1 = new BooleanSubscription(); + + vcs.add(s1, 1); + + Assert.assertEquals(0, vcs.size()); + + Assert.assertEquals(true, s1.isUnsubscribed()); + } + @Test + public void testTryAddAfterTerminate() { + vcs.unsubscribe(); + + BooleanSubscription s1 = new BooleanSubscription(); + Assert.assertEquals(false, vcs.tryAdd(s1, 1)); + + Assert.assertEquals(false, s1.isUnsubscribed()); + } + @Test + public void testPutIfAbsend() { + BooleanSubscription s1 = new BooleanSubscription(); + vcs.add(s1, 1); + + Assert.assertEquals(false, vcs.putIfAbsent(s1, 2)); + + Assert.assertEquals(1, vcs.get(s1)); + } + @Test + public void testReplaceValue() { + BooleanSubscription s1 = new BooleanSubscription(); + vcs.add(s1, 1); + + Assert.assertEquals(1, vcs.put(s1, 2)); + + Assert.assertEquals(2, vcs.get(s1)); + + Assert.assertEquals(1, vcs.size()); + } + @Test + public void testGetKeysAndUnsubscribe() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + Collection coll = vcs.getKeysAndUnsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + Assert.assertEquals(Arrays.asList(s1, s2), coll); + + Assert.assertEquals(false, s1.isUnsubscribed()); + Assert.assertEquals(false, s2.isUnsubscribed()); + } + @Test + public void testGetValuesAndUnsubscribe() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + Collection coll = vcs.getValuesAndUnsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + Assert.assertEquals(Arrays.asList(1, 2), coll); + + Assert.assertEquals(false, s1.isUnsubscribed()); + Assert.assertEquals(false, s2.isUnsubscribed()); + } + @Test + public void testGetEntriesAndUnsubscribe() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + Map coll = vcs.getEntriesAndUnsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + Map expectedMap = new LinkedHashMap(); + expectedMap.put(s1, 1); + expectedMap.put(s2, 2); + + Assert.assertEquals(expectedMap, coll); + + Assert.assertEquals(false, s1.isUnsubscribed()); + Assert.assertEquals(false, s2.isUnsubscribed()); + } + @Test + public void testUnsubscribeAndGet() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + Collection coll = vcs.unsubscribeAndGet(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + + Assert.assertEquals(Arrays.asList(1, 2), coll); + + Assert.assertEquals(true, s1.isUnsubscribed()); + Assert.assertEquals(true, s2.isUnsubscribed()); + } + @Test + public void testDeleteUnsubscribe() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + vcs.deleteUnsubscribe(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + Assert.assertEquals(false, s1.isUnsubscribed()); + Assert.assertEquals(false, s2.isUnsubscribed()); + } + @Test + public void testClear() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + vcs.clear(); + + Assert.assertEquals(false, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + Assert.assertEquals(true, s1.isUnsubscribed()); + Assert.assertEquals(true, s2.isUnsubscribed()); + } + @Test + public void testDeleteClear() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.add(s2, 2); + + vcs.deleteClear(); + + Assert.assertEquals(false, vcs.isUnsubscribed()); + Assert.assertEquals(0, vcs.size()); + + Assert.assertEquals(false, s1.isUnsubscribed()); + Assert.assertEquals(false, s2.isUnsubscribed()); + } + @Test + public void testDelete() { + BooleanSubscription s1 = new BooleanSubscription(); + + vcs.add(s1, 1); + + Assert.assertEquals(1, vcs.delete(s1)); + + Assert.assertEquals(false, s1.isUnsubscribed()); + } + @Test + public void testRemove() { + BooleanSubscription s1 = new BooleanSubscription(); + + vcs.add(s1, 1); + + Assert.assertEquals(1, vcs.remove(s1)); + + Assert.assertEquals(true, s1.isUnsubscribed()); + } + @Test + public void testGetOnTerminated() { + BooleanSubscription s1 = new BooleanSubscription(); + + vcs.add(s1, 1); + vcs.unsubscribe(); + + Assert.assertEquals(null, vcs.get(s1)); + } + @Test + public void testGetOrDefault() { + BooleanSubscription s1 = new BooleanSubscription(); + + Assert.assertEquals(1, vcs.getOrDefault(s1, 1)); + } + @Test + public void testGetOrDefaultTerminated() { + vcs.unsubscribe(); + + BooleanSubscription s1 = new BooleanSubscription(); + Assert.assertEquals(1, vcs.getOrDefault(s1, 1)); + } + @Test + public void testUnsubscribeAndGetTwice() { + BooleanSubscription s1 = new BooleanSubscription(); + vcs.add(s1, 1); + + vcs.unsubscribeAndGet(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + + Collection coll = vcs.unsubscribeAndGet(); + + Assert.assertEquals(true, vcs.isUnsubscribed()); + Assert.assertEquals(true, coll.isEmpty()); + } +}