Skip to content

Commit

Permalink
Improve BoundedSubscriber codecov
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanWuattier committed Jun 2, 2018
1 parent 74e7cde commit db252ee
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 32 deletions.
8 changes: 2 additions & 6 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -753,19 +753,15 @@ public static <T> Consumer<T> boundedConsumer(int bufferSize) {

public static class BoundedConsumer implements Consumer<Subscription> {

protected final int bufferSize;
final int bufferSize;

protected BoundedConsumer(int bufferSize) {
BoundedConsumer(int bufferSize) {
this.bufferSize = bufferSize;
}

@Override
public void accept(Subscription s) throws Exception {
s.request(bufferSize);
}

public int getBufferSize() {
return bufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? supe
* @param bufferSize the number of elements to prefetch from the source Publisher
* @param <T> the value type
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
final Consumer<? super Throwable> onError, final Action onComplete, int bufferSize) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.verifyPositive(bufferSize, "number > 0 required");
subscribe(o, new BoundedSubscriber<T>(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize)));
subscribe(o, new BoundedSubscriber<T>(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize),
bufferSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ public final class BoundedSubscriber<T> extends AtomicReference<Subscription>
final int limit;

public BoundedSubscriber(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Subscription> onSubscribe) {
Action onComplete, Consumer<? super Subscription> onSubscribe, int bufferSize) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
this.bufferSize = ((Functions.BoundedConsumer) onSubscribe).getBufferSize();
this.bufferSize = bufferSize;
this.limit = bufferSize - (bufferSize >> 2);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
Expand All @@ -20,12 +20,16 @@
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -53,12 +57,12 @@ public void accept(Throwable throwable) throws Exception {
public void run() throws Exception {
received.add(1);
}
}, new Functions.BoundedConsumer(128) {
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
throw new TestException();
}
});
}, 128);

assertFalse(o.isDisposed());

Expand Down Expand Up @@ -89,12 +93,12 @@ public void accept(Throwable throwable) throws Exception {
public void run() throws Exception {
received.add(1);
}
}, new Functions.BoundedConsumer(128) {
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
subscription.request(bufferSize);
subscription.request(128);
}
});
}, 128);

assertFalse(o.isDisposed());

Expand Down Expand Up @@ -128,12 +132,12 @@ public void accept(Throwable throwable) throws Exception {
public void run() throws Exception {
received.add(1);
}
}, new Functions.BoundedConsumer(128) {
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
subscription.request(bufferSize);
subscription.request(128);
}
});
}, 128);

assertFalse(o.isDisposed());

Expand Down Expand Up @@ -174,12 +178,12 @@ public void accept(Throwable throwable) throws Exception {
public void run() throws Exception {
throw new TestException();
}
}, new Functions.BoundedConsumer(128) {
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
subscription.request(bufferSize);
subscription.request(128);
}
});
}, 128);

assertFalse(o.isDisposed());

Expand Down Expand Up @@ -216,12 +220,12 @@ public void accept(Throwable e) throws Exception {
public void run() throws Exception {

}
}, new Functions.BoundedConsumer(128) {
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
subscription.request(bufferSize);
subscription.request(128);
}
});
}, 128);

pp.subscribe(s);

Expand Down Expand Up @@ -255,12 +259,12 @@ public void accept(Throwable e) throws Exception {
@Override
public void run() throws Exception {
}
}, new Functions.BoundedConsumer(128) {
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
throw new TestException();
}
});
}, 128);

pp.subscribe(s);

Expand All @@ -270,12 +274,104 @@ public void accept(Subscription s) throws Exception {
assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void badSourceOnSubscribe() {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
BooleanSubscription s1 = new BooleanSubscription();
s.onSubscribe(s1);
BooleanSubscription s2 = new BooleanSubscription();
s.onSubscribe(s2);

assertFalse(s1.isCancelled());
assertTrue(s2.isCancelled());

s.onNext(1);
s.onComplete();
}
});

final List<Object> received = new ArrayList<Object>();

BoundedSubscriber<Object> o = new BoundedSubscriber<Object>(new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
received.add(v);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
received.add(e);
}
}, new Action() {
@Override
public void run() throws Exception {
received.add(100);
}
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
s.request(128);
}
}, 128);

source.subscribe(o);

assertEquals(Arrays.asList(1, 100), received);
}

@Test
public void badSourceEmitAfterDone() {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
BooleanSubscription s1 = new BooleanSubscription();
s.onSubscribe(s1);

s.onNext(1);
s.onComplete();
s.onNext(2);
s.onError(new TestException());
s.onComplete();
}
});

final List<Object> received = new ArrayList<Object>();

BoundedSubscriber<Object> o = new BoundedSubscriber<Object>(new Consumer<Object>() {
@Override
public void accept(Object v) throws Exception {
received.add(v);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
received.add(e);
}
}, new Action() {
@Override
public void run() throws Exception {
received.add(100);
}
}, new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
s.request(128);
}
}, 128);

source.subscribe(o);

assertEquals(Arrays.asList(1, 100), received);
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
BoundedSubscriber<Integer> o = new BoundedSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
Functions.<Subscription>boundedConsumer(128));
Functions.<Subscription>boundedConsumer(128), 128);

assertFalse(o.hasCustomOnError());
}
Expand All @@ -285,7 +381,7 @@ public void customOnErrorShouldReportCustomOnError() {
BoundedSubscriber<Integer> o = new BoundedSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
Functions.<Subscription>boundedConsumer(128));
Functions.<Subscription>boundedConsumer(128), 128);

assertTrue(o.hasCustomOnError());
}
Expand Down

0 comments on commit db252ee

Please sign in to comment.