Skip to content

Commit

Permalink
2.x: Improve Completable.delay operator internals (#6096)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jul 19, 2018
1 parent 1aeac06 commit 27c63b6
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package io.reactivex.internal.operators.completable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;

public final class CompletableDelay extends Completable {

Expand All @@ -40,54 +42,70 @@ public CompletableDelay(CompletableSource source, long delay, TimeUnit unit, Sch

@Override
protected void subscribeActual(final CompletableObserver s) {
final CompositeDisposable set = new CompositeDisposable();

source.subscribe(new Delay(set, s));
source.subscribe(new Delay(s, delay, unit, scheduler, delayError));
}

final class Delay implements CompletableObserver {
static final class Delay extends AtomicReference<Disposable>
implements CompletableObserver, Runnable, Disposable {

private static final long serialVersionUID = 465972761105851022L;

final CompletableObserver downstream;

final long delay;

final TimeUnit unit;

private final CompositeDisposable set;
final CompletableObserver s;
final Scheduler scheduler;

Delay(CompositeDisposable set, CompletableObserver s) {
this.set = set;
this.s = s;
final boolean delayError;

Throwable error;

Delay(CompletableObserver downstream, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
this.downstream = downstream;
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}

@Override
public void onComplete() {
set.add(scheduler.scheduleDirect(new OnComplete(), delay, unit));
DisposableHelper.replace(this, scheduler.scheduleDirect(this, delay, unit));
}

@Override
public void onError(final Throwable e) {
set.add(scheduler.scheduleDirect(new OnError(e), delayError ? delay : 0, unit));
error = e;
DisposableHelper.replace(this, scheduler.scheduleDirect(this, delayError ? delay : 0, unit));
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
s.onSubscribe(set);
public void dispose() {
DisposableHelper.dispose(this);
}

final class OnComplete implements Runnable {
@Override
public void run() {
s.onComplete();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

final class OnError implements Runnable {
private final Throwable e;

OnError(Throwable e) {
this.e = e;
}

@Override
public void run() {
s.onError(e);
@Override
public void run() {
Throwable e = error;
error = null;
if (e != null) {
downstream.onError(e);
} else {
downstream.onComplete();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@

package io.reactivex.internal.operators.completable;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotEquals;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.functions.Consumer;
import org.junit.Test;

import io.reactivex.Completable;
import io.reactivex.schedulers.Schedulers;

import static org.junit.Assert.assertNotEquals;
import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.*;

public class CompletableDelayTest {

Expand Down Expand Up @@ -58,4 +59,65 @@ public void accept(Throwable throwable) throws Exception {
assertNotEquals(Thread.currentThread(), thread.get());
}

@Test
public void disposed() {
TestHelper.checkDisposed(Completable.never().delay(1, TimeUnit.MINUTES));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeCompletable(new Function<Completable, CompletableSource>() {
@Override
public CompletableSource apply(Completable c) throws Exception {
return c.delay(1, TimeUnit.MINUTES);
}
});
}

@Test
public void normal() {
Completable.complete()
.delay(1, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
}

@Test
public void errorNotDelayed() {
TestScheduler scheduler = new TestScheduler();

TestObserver<Void> to = Completable.error(new TestException())
.delay(100, TimeUnit.MILLISECONDS, scheduler, false)
.test();

to.assertEmpty();

scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

to.assertFailure(TestException.class);

scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

to.assertFailure(TestException.class);
}

@Test
public void errorDelayed() {
TestScheduler scheduler = new TestScheduler();

TestObserver<Void> to = Completable.error(new TestException())
.delay(100, TimeUnit.MILLISECONDS, scheduler, true)
.test();

to.assertEmpty();

scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

to.assertEmpty();

scheduler.advanceTimeBy(99, TimeUnit.MILLISECONDS);

to.assertFailure(TestException.class);
}
}

0 comments on commit 27c63b6

Please sign in to comment.