diff --git a/src/main/java/rx/observers/AsyncCompletableSubscriber.java b/src/main/java/rx/observers/AsyncCompletableSubscriber.java
new file mode 100644
index 0000000000..d61585e4fa
--- /dev/null
+++ b/src/main/java/rx/observers/AsyncCompletableSubscriber.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.observers;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import rx.Completable.CompletableSubscriber;
+import rx.Subscription;
+import rx.annotations.Experimental;
+import rx.internal.util.RxJavaPluginUtils;
+
+/**
+ * An abstract base class for CompletableSubscriber implementations that want to expose an unsubscription
+ * capability.
+ *
+ * Calling {@link #unsubscribe()} and {@link #isUnsubscribed()} is threadsafe and can happen at any time, even
+ * before or during an active {@link rx.Completable#subscribe(CompletableSubscriber)} call.
+ *
+ * Override the {@link #onStart()} method to execute custom logic on the very first successful onSubscribe call.
+ *
+ * If one wants to remain consistent regarding {@link #isUnsubscribed()} and being terminated,
+ * the {@link #clear()} method should be called from the implementing onError and onCompleted methods.
+ *
+ *
+ * public final class MyCompletableSubscriber extends AsyncCompletableSubscriber {
+ * @Override
+ * public void onStart() {
+ * System.out.println("Started!");
+ * }
+ *
+ * @Override
+ * public void onCompleted() {
+ * System.out.println("Completed!");
+ * clear();
+ * }
+ *
+ * @Override
+ * public void onError(Throwable e) {
+ * e.printStackTrace();
+ * clear();
+ * }
+ * }
+ *
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+@Experimental
+public abstract class AsyncCompletableSubscriber implements CompletableSubscriber, Subscription {
+
+ /**
+ * Holds onto a deferred subscription and allows asynchronous cancellation before the call
+ * to onSubscribe() by the upstream.
+ */
+ private final AtomicReference upstream = new AtomicReference();
+
+ @Override
+ public final void onSubscribe(Subscription d) {
+ if (!upstream.compareAndSet(null, d)) {
+ d.unsubscribe();
+ if (upstream.get() != UNSUBSCRIBED) {
+ RxJavaPluginUtils.handleException(new IllegalStateException("Subscription already set!"));
+ }
+ } else {
+ onStart();
+ }
+ }
+
+ /**
+ * Called before the first onSubscribe() call succeeds.
+ */
+ protected void onStart() {
+ }
+
+ @Override
+ public final boolean isUnsubscribed() {
+ return upstream.get() == UNSUBSCRIBED;
+ }
+
+ /**
+ * Call to clear the upstream's subscription without unsubscribing it.
+ */
+ protected final void clear() {
+ upstream.set(UNSUBSCRIBED);
+ }
+
+ @Override
+ public final void unsubscribe() {
+ Subscription current = upstream.get();
+ if (current != UNSUBSCRIBED) {
+ current = upstream.getAndSet(UNSUBSCRIBED);
+ if (current != null && current != UNSUBSCRIBED) {
+ current.unsubscribe();
+ }
+ }
+
+ }
+
+ /**
+ * Indicates the unsubscribed state.
+ */
+ static final Unsubscribed UNSUBSCRIBED = new Unsubscribed();
+
+ static final class Unsubscribed implements Subscription {
+
+ @Override
+ public void unsubscribe() {
+ // deliberately no op
+ }
+
+ @Override
+ public boolean isUnsubscribed() {
+ return true;
+ }
+
+ }
+}
diff --git a/src/test/java/rx/observers/AsyncCompletableSubscriberTest.java b/src/test/java/rx/observers/AsyncCompletableSubscriberTest.java
new file mode 100644
index 0000000000..7ac34f1628
--- /dev/null
+++ b/src/test/java/rx/observers/AsyncCompletableSubscriberTest.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.observers;
+
+import java.util.*;
+
+import org.junit.*;
+
+import rx.Completable;
+import rx.Observable;
+import rx.exceptions.TestException;
+
+public class AsyncCompletableSubscriberTest {
+
+ static final class TestCS extends AsyncCompletableSubscriber {
+ int started;
+
+ int completions;
+
+ final List errors = new ArrayList();
+
+ @Override
+ protected void onStart() {
+ started++;
+ }
+
+ @Override
+ public void onCompleted() {
+ completions++;
+ clear();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ errors.add(e);
+ clear();
+ }
+ }
+
+ @Test
+ public void normal() {
+ TestCS ts = new TestCS();
+
+ Assert.assertFalse(ts.isUnsubscribed());
+
+ Completable.complete().subscribe(ts);
+
+ Assert.assertEquals(1, ts.started);
+ Assert.assertEquals(1, ts.completions);
+ Assert.assertEquals(ts.errors.toString(), 0, ts.errors.size());
+ Assert.assertTrue(ts.isUnsubscribed());
+ }
+
+ @Test
+ public void error() {
+ TestCS ts = new TestCS();
+
+ Assert.assertFalse(ts.isUnsubscribed());
+
+ Completable.error(new TestException("Forced failure")).subscribe(ts);
+
+ Assert.assertEquals(1, ts.started);
+ Assert.assertEquals(0, ts.completions);
+ Assert.assertEquals(ts.errors.toString(), 1, ts.errors.size());
+ Assert.assertTrue(ts.errors.get(0).toString(), ts.errors.get(0) instanceof TestException);
+ Assert.assertEquals("Forced failure", ts.errors.get(0).getMessage());
+ Assert.assertTrue(ts.isUnsubscribed());
+ }
+
+
+ @Test
+ public void unsubscribed() {
+ TestCS ts = new TestCS();
+ ts.unsubscribe();
+
+ Assert.assertTrue(ts.isUnsubscribed());
+
+ Observable.range(1, 10).toCompletable().subscribe(ts);
+
+ Assert.assertEquals(0, ts.started);
+ Assert.assertEquals(0, ts.completions);
+ Assert.assertEquals(ts.errors.toString(), 0, ts.errors.size());
+ Assert.assertTrue(ts.isUnsubscribed());
+ }
+}