Skip to content

Commit

Permalink
1.x: add AsyncCompletableSubscriber that exposes unsubscribe() (#4020)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 21, 2016
1 parent 535fb75 commit 03bc7a2
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 0 deletions.
128 changes: 128 additions & 0 deletions src/main/java/rx/observers/AsyncCompletableSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observers;

import java.util.concurrent.atomic.AtomicReference;

import rx.Completable.CompletableSubscriber;
import rx.Subscription;
import rx.annotations.Experimental;
import rx.internal.util.RxJavaPluginUtils;

/**
* An abstract base class for CompletableSubscriber implementations that want to expose an unsubscription
* capability.
* <p>
* Calling {@link #unsubscribe()} and {@link #isUnsubscribed()} is threadsafe and can happen at any time, even
* before or during an active {@link rx.Completable#subscribe(CompletableSubscriber)} call.
* <p>
* Override the {@link #onStart()} method to execute custom logic on the very first successful onSubscribe call.
* <p>
* If one wants to remain consistent regarding {@link #isUnsubscribed()} and being terminated,
* the {@link #clear()} method should be called from the implementing onError and onCompleted methods.
* <p>
* <pre><code>
* public final class MyCompletableSubscriber extends AsyncCompletableSubscriber {
* &#64;Override
* public void onStart() {
* System.out.println("Started!");
* }
*
* &#64;Override
* public void onCompleted() {
* System.out.println("Completed!");
* clear();
* }
*
* &#64;Override
* public void onError(Throwable e) {
* e.printStackTrace();
* clear();
* }
* }
* </code></pre>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public abstract class AsyncCompletableSubscriber implements CompletableSubscriber, Subscription {

/**
* Holds onto a deferred subscription and allows asynchronous cancellation before the call
* to onSubscribe() by the upstream.
*/
private final AtomicReference<Subscription> upstream = new AtomicReference<Subscription>();

@Override
public final void onSubscribe(Subscription d) {
if (!upstream.compareAndSet(null, d)) {
d.unsubscribe();
if (upstream.get() != UNSUBSCRIBED) {
RxJavaPluginUtils.handleException(new IllegalStateException("Subscription already set!"));
}
} else {
onStart();
}
}

/**
* Called before the first onSubscribe() call succeeds.
*/
protected void onStart() {
}

@Override
public final boolean isUnsubscribed() {
return upstream.get() == UNSUBSCRIBED;
}

/**
* Call to clear the upstream's subscription without unsubscribing it.
*/
protected final void clear() {
upstream.set(UNSUBSCRIBED);
}

@Override
public final void unsubscribe() {
Subscription current = upstream.get();
if (current != UNSUBSCRIBED) {
current = upstream.getAndSet(UNSUBSCRIBED);
if (current != null && current != UNSUBSCRIBED) {
current.unsubscribe();
}
}

}

/**
* Indicates the unsubscribed state.
*/
static final Unsubscribed UNSUBSCRIBED = new Unsubscribed();

static final class Unsubscribed implements Subscription {

@Override
public void unsubscribe() {
// deliberately no op
}

@Override
public boolean isUnsubscribed() {
return true;
}

}
}
98 changes: 98 additions & 0 deletions src/test/java/rx/observers/AsyncCompletableSubscriberTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observers;

import java.util.*;

import org.junit.*;

import rx.Completable;
import rx.Observable;
import rx.exceptions.TestException;

public class AsyncCompletableSubscriberTest {

static final class TestCS extends AsyncCompletableSubscriber {
int started;

int completions;

final List<Throwable> errors = new ArrayList<Throwable>();

@Override
protected void onStart() {
started++;
}

@Override
public void onCompleted() {
completions++;
clear();
}

@Override
public void onError(Throwable e) {
errors.add(e);
clear();
}
}

@Test
public void normal() {
TestCS ts = new TestCS();

Assert.assertFalse(ts.isUnsubscribed());

Completable.complete().subscribe(ts);

Assert.assertEquals(1, ts.started);
Assert.assertEquals(1, ts.completions);
Assert.assertEquals(ts.errors.toString(), 0, ts.errors.size());
Assert.assertTrue(ts.isUnsubscribed());
}

@Test
public void error() {
TestCS ts = new TestCS();

Assert.assertFalse(ts.isUnsubscribed());

Completable.error(new TestException("Forced failure")).subscribe(ts);

Assert.assertEquals(1, ts.started);
Assert.assertEquals(0, ts.completions);
Assert.assertEquals(ts.errors.toString(), 1, ts.errors.size());
Assert.assertTrue(ts.errors.get(0).toString(), ts.errors.get(0) instanceof TestException);
Assert.assertEquals("Forced failure", ts.errors.get(0).getMessage());
Assert.assertTrue(ts.isUnsubscribed());
}


@Test
public void unsubscribed() {
TestCS ts = new TestCS();
ts.unsubscribe();

Assert.assertTrue(ts.isUnsubscribed());

Observable.range(1, 10).toCompletable().subscribe(ts);

Assert.assertEquals(0, ts.started);
Assert.assertEquals(0, ts.completions);
Assert.assertEquals(ts.errors.toString(), 0, ts.errors.size());
Assert.assertTrue(ts.isUnsubscribed());
}
}

0 comments on commit 03bc7a2

Please sign in to comment.