Skip to content

Commit

Permalink
issue-2764: add new operator onBackpressureDrop(Action1 onDrop)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Wu committed Mar 1, 2015
1 parent c41b37d commit 290602e
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 6 deletions.
21 changes: 21 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5258,6 +5258,27 @@ public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflo
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
* <p>
* If the downstream request count hits 0 then the Observable will refrain from calling {@code onNext} until
* the observer invokes {@code request(n)} again to increase the request count.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureDrop} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onDrop the action to invoke for each item dropped. onDrop action should be fast and should never block.
* @return the source Observable modified to drop {@code onNext} notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
public final Observable<T> onBackpressureDrop(Action1<? super T> onDrop) {
return lift(new OperatorOnBackpressureDrop<T>(onDrop));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Action1;

public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {

/** Lazy initialization via inner-class holder. */
private static final class Holder {
/** A singleton instance. */
static final OperatorOnBackpressureDrop<Object> INSTANCE = new OperatorOnBackpressureDrop<Object>();
}

/**
* @return a singleton instance of this stateless operator.
*/
@SuppressWarnings({ "unchecked" })
public static <T> OperatorOnBackpressureDrop<T> instance() {
return (OperatorOnBackpressureDrop<T>)Holder.INSTANCE;
}
private OperatorOnBackpressureDrop() { }

private final Action1<? super T> onDrop;

private OperatorOnBackpressureDrop() {
this(null);
}

public OperatorOnBackpressureDrop(Action1<? super T> onDrop) {
this.onDrop = onDrop;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final AtomicLong requested = new AtomicLong();
Expand Down Expand Up @@ -68,6 +81,11 @@ public void onNext(T t) {
if (requested.get() > 0) {
child.onNext(t);
requested.decrementAndGet();
} else {
// item dropped
if(onDrop != null) {
onDrop.call(t);
}
}
}

Expand Down
83 changes: 78 additions & 5 deletions src/test/java/rx/BackpressureTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.junit.*;

import org.junit.rules.TestName;
import rx.Observable.OnSubscribe;
import rx.exceptions.MissingBackpressureException;
import rx.functions.*;
Expand All @@ -33,6 +34,9 @@

public class BackpressureTests {

@Rule
public TestName testName = new TestName();

@After
public void doAfterTest() {
TestObstructionDetection.checkObstruction();
Expand Down Expand Up @@ -424,18 +428,56 @@ public void testOnBackpressureDrop() {
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();



List<Integer> onNextEvents = ts.getOnNextEvents();
assertEquals(NUM, onNextEvents.size());

Integer lastEvent = onNextEvents.get(NUM - 1);

System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
}
}

@Test(timeout = 10000)
public void testOnBackpressureDropWithAction() {
for (int i = 0; i < 100; i++) {
final AtomicInteger emitCount = new AtomicInteger();
final AtomicInteger dropCount = new AtomicInteger();
final AtomicInteger passCount = new AtomicInteger();
final int NUM = (int) (RxRingBuffer.SIZE * 1.5); // > 1 so that take doesn't prevent buffer overflow
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
@Override
public void call(Integer i) {
dropCount.incrementAndGet();
}
})
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
passCount.incrementAndGet();
}
})
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

List<Integer> onNextEvents = ts.getOnNextEvents();
Integer lastEvent = onNextEvents.get(NUM - 1);
System.out.println(testName.getMethodName() + " => Received: " + onNextEvents.size() + " Passed: " + passCount.get() + " Dropped: " + dropCount.get() + " Emitted: " + emitCount.get() + " Last value: " + lastEvent);
assertEquals(NUM, onNextEvents.size());
// in reality, NUM < passCount
assertTrue(NUM <= passCount.get());
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
assertTrue(0 < dropCount.get());
assertEquals(emitCount.get(), passCount.get() + dropCount.get());
}
}

@Test(timeout = 10000)
public void testOnBackpressureDropSynchronous() {
for (int i = 0; i < 100; i++) {
Expand All @@ -446,18 +488,49 @@ public void testOnBackpressureDropSynchronous() {
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

List<Integer> onNextEvents = ts.getOnNextEvents();
assertEquals(NUM, onNextEvents.size());

Integer lastEvent = onNextEvents.get(NUM - 1);

System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
}
}

@Test(timeout = 10000)
public void testOnBackpressureDropSynchronousWithAction() {
for (int i = 0; i < 100; i++) {
final AtomicInteger dropCount = new AtomicInteger();
int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop(new Action1<Integer>() {
@Override
public void call(Integer i) {
dropCount.incrementAndGet();
}
})
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

List<Integer> onNextEvents = ts.getOnNextEvents();
assertEquals(NUM, onNextEvents.size());

Integer lastEvent = onNextEvents.get(NUM - 1);

System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
// no drop in synchronous mode
assertEquals(0, dropCount.get());
assertEquals(c.get(), onNextEvents.size());
}
}

@Test(timeout = 2000)
public void testOnBackpressureBuffer() {
int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
Expand Down

0 comments on commit 290602e

Please sign in to comment.