diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java index fa600c64b1..79b0a49383 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java @@ -14,9 +14,11 @@ package io.reactivex.internal.operators.completable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; public final class CompletableDelay extends Completable { @@ -40,54 +42,70 @@ public CompletableDelay(CompletableSource source, long delay, TimeUnit unit, Sch @Override protected void subscribeActual(final CompletableObserver s) { - final CompositeDisposable set = new CompositeDisposable(); - - source.subscribe(new Delay(set, s)); + source.subscribe(new Delay(s, delay, unit, scheduler, delayError)); } - final class Delay implements CompletableObserver { + static final class Delay extends AtomicReference + implements CompletableObserver, Runnable, Disposable { + + private static final long serialVersionUID = 465972761105851022L; + + final CompletableObserver downstream; + + final long delay; + + final TimeUnit unit; - private final CompositeDisposable set; - final CompletableObserver s; + final Scheduler scheduler; - Delay(CompositeDisposable set, CompletableObserver s) { - this.set = set; - this.s = s; + final boolean delayError; + + Throwable error; + + Delay(CompletableObserver downstream, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) { + this.downstream = downstream; + this.delay = delay; + this.unit = unit; + this.scheduler = scheduler; + this.delayError = delayError; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + downstream.onSubscribe(this); + } } @Override public void onComplete() { - set.add(scheduler.scheduleDirect(new OnComplete(), delay, unit)); + DisposableHelper.replace(this, scheduler.scheduleDirect(this, delay, unit)); } @Override public void onError(final Throwable e) { - set.add(scheduler.scheduleDirect(new OnError(e), delayError ? delay : 0, unit)); + error = e; + DisposableHelper.replace(this, scheduler.scheduleDirect(this, delayError ? delay : 0, unit)); } @Override - public void onSubscribe(Disposable d) { - set.add(d); - s.onSubscribe(set); + public void dispose() { + DisposableHelper.dispose(this); } - final class OnComplete implements Runnable { - @Override - public void run() { - s.onComplete(); - } + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); } - final class OnError implements Runnable { - private final Throwable e; - - OnError(Throwable e) { - this.e = e; - } - - @Override - public void run() { - s.onError(e); + @Override + public void run() { + Throwable e = error; + error = null; + if (e != null) { + downstream.onError(e); + } else { + downstream.onComplete(); } } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableDelayTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableDelayTest.java index 78393b39ae..174a520e0f 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableDelayTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableDelayTest.java @@ -13,17 +13,18 @@ package io.reactivex.internal.operators.completable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertNotEquals; + +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.functions.Consumer; import org.junit.Test; -import io.reactivex.Completable; -import io.reactivex.schedulers.Schedulers; - -import static org.junit.Assert.assertNotEquals; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.*; public class CompletableDelayTest { @@ -58,4 +59,65 @@ public void accept(Throwable throwable) throws Exception { assertNotEquals(Thread.currentThread(), thread.get()); } + @Test + public void disposed() { + TestHelper.checkDisposed(Completable.never().delay(1, TimeUnit.MINUTES)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletable(new Function() { + @Override + public CompletableSource apply(Completable c) throws Exception { + return c.delay(1, TimeUnit.MINUTES); + } + }); + } + + @Test + public void normal() { + Completable.complete() + .delay(1, TimeUnit.MILLISECONDS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void errorNotDelayed() { + TestScheduler scheduler = new TestScheduler(); + + TestObserver to = Completable.error(new TestException()) + .delay(100, TimeUnit.MILLISECONDS, scheduler, false) + .test(); + + to.assertEmpty(); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + to.assertFailure(TestException.class); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + to.assertFailure(TestException.class); + } + + @Test + public void errorDelayed() { + TestScheduler scheduler = new TestScheduler(); + + TestObserver to = Completable.error(new TestException()) + .delay(100, TimeUnit.MILLISECONDS, scheduler, true) + .test(); + + to.assertEmpty(); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + to.assertEmpty(); + + scheduler.advanceTimeBy(99, TimeUnit.MILLISECONDS); + + to.assertFailure(TestException.class); + } }