Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription overhaul2 #620

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/joins/JoinObserver1.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void addActivePlan(ActivePlan0 activePlan) {
@Override
public void subscribe(Object gate) {
this.gate = gate;
subscription.set(source.materialize().subscribe(this));
subscription.setSubscription(source.materialize().subscribe(this));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
import rx.subscriptions.SingleAssignmentSubscription;

/**
* Thread-safe wrapper around Observable Subscription that ensures unsubscribe can be called only once.
Expand All @@ -30,21 +31,14 @@
* </ul>
*/
public final class SafeObservableSubscription implements Subscription {

private static final Subscription UNSUBSCRIBED = new Subscription()
{
@Override
public void unsubscribe()
{
}
};
private final AtomicReference<Subscription> actualSubscription = new AtomicReference<Subscription>();
private final SingleAssignmentSubscription sas;

public SafeObservableSubscription() {
sas = new SingleAssignmentSubscription();
}

public SafeObservableSubscription(Subscription actualSubscription) {
this.actualSubscription.set(actualSubscription);
sas = new SingleAssignmentSubscription(actualSubscription);
}

/**
Expand All @@ -56,27 +50,16 @@ public SafeObservableSubscription(Subscription actualSubscription) {
* if trying to set more than once (or use this method after setting via constructor)
*/
public SafeObservableSubscription wrap(Subscription actualSubscription) {
if (!this.actualSubscription.compareAndSet(null, actualSubscription)) {
if (this.actualSubscription.get() == UNSUBSCRIBED) {
actualSubscription.unsubscribe();
return this;
}
throw new IllegalStateException("Can not set subscription more than once.");
}
sas.setSubscription(actualSubscription);
return this;
}

@Override
public void unsubscribe() {
// get the real thing and set to null in an atomic operation so we will only ever call unsubscribe once
Subscription actual = actualSubscription.getAndSet(UNSUBSCRIBED);
// if it's not null we will unsubscribe
if (actual != null) {
actual.unsubscribe();
}
sas.unsubscribe();
}

public boolean isUnsubscribed() {
return actualSubscription.get() == UNSUBSCRIBED;
return sas.isUnsubscribed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;

/**
* Base class to manage a reference to another subscription in atomic manner
* and allows callbacks to handle the referenced subscription at the
* pre-swap and post-swap stages.
*
*/
public abstract class AbstractAssignmentSubscription implements Subscription {
/** The subscription holding the reference. */
protected final AtomicReference<Subscription> reference = new AtomicReference<Subscription>();
/** Sentinel for the unsubscribed state. */
private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
@Override
public void unsubscribe() {
}
};
/** Creates an empty AbstractAssignmentSubscription. */
public AbstractAssignmentSubscription() {

}
/**
* Creates a AbstractAssignmentSubscription with the given subscription
* as its initial value.
*
* @param s the initial subscription
*/
public AbstractAssignmentSubscription(Subscription s) {
this();
reference.set(s);
}
public boolean isUnsubscribed() {
return reference.get() == UNSUBSCRIBED_SENTINEL;
}

@Override
public void unsubscribe() {
Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL);
if (s != null) {
s.unsubscribe();
}
}

public void setSubscription(Subscription s) {
do {
Subscription r = reference.get();
if (r == UNSUBSCRIBED_SENTINEL) {
s.unsubscribe();
return;
}
onPreSwap(r);
if (reference.compareAndSet(r, s)) {
onPostSwap(r);
break;
}
} while (true);
}
/**
* Override this method to perform logic on a subscription before
* an attempt is tried to swap it for a new subscription.
* @param current the current subscription value
*/
protected void onPreSwap(Subscription current) { }
/**
* Override this method to perform actions once a subscription has been
* swapped to a new one.
* @param old the old subscription value
*/
protected void onPostSwap(Subscription old) { }

public Subscription getSubscription() {
Subscription s = reference.get();
return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import java.util.concurrent.atomic.AtomicReference;
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.Actions;
import rx.util.functions.Func0;

/**
* Base class for subscriptions with lock-free behavior.
*/
public abstract class AbstractAtomicSubscription implements Subscription {
/**
* The subscription state for tracking the mutating and unsubscribed states.
*/
protected enum SubscriptionState {
/** Indicates that the subscription may be mutated. */
ACTIVE,
/** Indicates that a mutation is going on. */
MUTATING,
/** Indicates that the subscription has been unsubscribed and no further mutation may happen. */
UNSUBSCRIBED
}
/** The current state. */
private final AtomicReference<SubscriptionState> state = new AtomicReference<SubscriptionState>(SubscriptionState.ACTIVE);
/**
* Atomically sets the state.
* @param newState the new state
*/
protected final void setState(SubscriptionState newState) {
if (newState == null) {
throw new NullPointerException("newState");
}
state.set(newState);
}
/**
* Atomically retrieves the current state.
* @return the current state.
*/
protected final SubscriptionState getState() {
return state.get();
}
/**
* Executes the given action while in the MUTATING state and transitions to the supplied new state.
* <p>
* Even if the {@code action} throws an exception, the state is always set to the {@code newState}.
* @param newState the state to set after the action was called
* @param action the action to execute while in the MUTATING state
* @return true if the action was called, false if the subscription was unsubscribed
*/
protected final boolean callAndSet(SubscriptionState newState, Action0 action) {
return callAndSet(newState, action, null);
}
/**
* Executes the given action and sets the state to the supplied newState or
* executes the function and sets the state to its return value.
*
* @param newState the default new state, set if action is executed or the func call throws.
* @param action the action to execute, null if not applicable
* @param func the function to execute, null if not applicable
* @return true if either the action or function was executed,
* false if the subscription was unsubscribed during the operation
*/
private boolean callAndSet(SubscriptionState newState, Action0 action, Func0<SubscriptionState> func) {
if (newState == null) {
throw new NullPointerException("newState");
}
if (action == null && func == null) {
throw new IllegalArgumentException("action & func both null!");
}
if (action != null && func != null) {
throw new IllegalArgumentException("action & func both non-null!");
}
do {
SubscriptionState s = state.get();
if (s == SubscriptionState.UNSUBSCRIBED) {
return false;
}
if (s == SubscriptionState.MUTATING) {
continue;
}
if (state.compareAndSet(s, SubscriptionState.MUTATING)) {
SubscriptionState toSet = newState;
try {
if (action != null) {
action.call();
} else {
toSet = func.call();
}
} finally {
state.set(toSet);
}
return true;
}
} while (true);
}
/**
* Executes the given function while in the MUTATING state and transitions
* to state returned by the function.
* <p>
* If the func throws, the state is reset to ACTIVE and the exception is propagated.
*
* @param func the function to call, should return the state after the function call
* @return true if the action was called, false if the subscription was unsubscribed
*/
protected final boolean call(Func0<SubscriptionState> func) {
return callAndSet(SubscriptionState.ACTIVE, null, func);
}
/**
* Transitions to the supplied new state and executes the given action.
* <p>
* The action is responsible to
* @param newState the state to set before the action is called
* @param action the action to execute while in the MUTATING state
* @return true if the action was called, false if the subscription was unsubscribed
*/
protected final boolean setAndCall(SubscriptionState newState, Action0 action) {
if (action == null) {
throw new NullPointerException("action");
}
if (setAndCall(newState, Actions.empty0())) {
action.call();
return true;
}
return false;
}
/**
* Executes the given action and returns in the ACTIVE state.
* @param action the action to execute while in the MUTATING state
* @return true if the action was called, false if the subscription was unsubscribed
*/
protected final boolean call(Action0 action) {
return callAndSet(SubscriptionState.ACTIVE, action);
}
/**
* Returns true if this subscription has been unsubscribed.
* @return true if this subscription has been unsubscribed
*/
public final boolean isUnsubscribed() {
return state.get() == SubscriptionState.UNSUBSCRIBED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,48 @@

import rx.Observable;
import rx.Subscription;
import rx.util.functions.Action0;

/**
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable(v=vs.103).aspx">Rx.Net equivalent BooleanDisposable</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable.aspx">Rx.Net equivalent BooleanDisposable</a>
*/
public class BooleanSubscription implements Subscription {

private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
/** The subscription state. */
private final AtomicBoolean unsubscribed = new AtomicBoolean();

public boolean isUnsubscribed() {
return unsubscribed.get();
}

/**
* Override this method to perform any action once if this BooleanSubscription
* is unsubscribed.
*/
protected void onUnsubscribe() { }

@Override
public void unsubscribe() {
unsubscribed.set(true);
if (unsubscribed.compareAndSet(false, true)) {
onUnsubscribe();
}
}
/**
* Returns a BooleanSubscription which calls the given action once
* it is unsubscribed.
* @param action the action to call when unsubscribing
* @return the BooleanSubscription which calls the given action once
* it is unsubscribed
*/
public static BooleanSubscription withAction(final Action0 action) {
if (action == null) {
throw new NullPointerException("action");
}
return new BooleanSubscription() {
@Override
protected void onUnsubscribe() {
action.call();
}
};
}

}
Loading