Skip to content

Commit

Permalink
Issue ReactiveX#235: TimeLimiter as rx transformer (ReactiveX#624)
Browse files Browse the repository at this point in the history
  • Loading branch information
hexmind authored and RobWin committed Sep 18, 2019
1 parent 1a26679 commit b8d39d8
Show file tree
Hide file tree
Showing 9 changed files with 796 additions and 0 deletions.
2 changes: 2 additions & 0 deletions resilience4j-rxjava2/build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
dependencies {
compileOnly project(':resilience4j-circuitbreaker')
compileOnly project(':resilience4j-ratelimiter')
compileOnly project(':resilience4j-timelimiter')
compileOnly project(':resilience4j-bulkhead')
compileOnly project(':resilience4j-retry')
compileOnly ( libraries.rxjava2)
testCompile project(':resilience4j-test')
testCompile project(':resilience4j-circuitbreaker')
testCompile project(':resilience4j-ratelimiter')
testCompile project(':resilience4j-timelimiter')
testCompile project(':resilience4j-bulkhead')
testCompile project(':resilience4j-retry')
testCompile ( libraries.rxjava2)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2019 authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.resilience4j.timelimiter.transformer;

import io.github.resilience4j.timelimiter.TimeLimiter;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.CompletableTransformer;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.SingleTransformer;

import java.util.concurrent.TimeUnit;

import org.reactivestreams.Publisher;

public class TimeLimiterTransformer<T> implements FlowableTransformer<T, T>, ObservableTransformer<T, T>,
SingleTransformer<T, T>, CompletableTransformer, MaybeTransformer<T, T> {

private final TimeLimiter timeLimiter;

private TimeLimiterTransformer(TimeLimiter timeLimiter) {
this.timeLimiter = timeLimiter;
}

/**
* Creates a TimeLimiterTransformer.
*
* @param timeLimiter the TimeLimiter
* @param <T> the value type of the upstream and downstream
* @return a TimeLimiterTransformer
*/
public static <T> TimeLimiterTransformer<T> of(TimeLimiter timeLimiter) {
return new TimeLimiterTransformer<>(timeLimiter);
}

@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
.doOnNext(t -> timeLimiter.onSuccess())
.doOnComplete(timeLimiter::onSuccess)
.doOnError(timeLimiter::onError);
}

@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
.doOnNext(t -> timeLimiter.onSuccess())
.doOnComplete(timeLimiter::onSuccess)
.doOnError(timeLimiter::onError);
}

@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
.doOnSuccess(t -> timeLimiter.onSuccess())
.doOnError(timeLimiter::onError);
}

@Override
public CompletableSource apply(Completable upstream) {
return upstream
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
.doOnComplete(timeLimiter::onSuccess)
.doOnError(timeLimiter::onError);
}

@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
return upstream
.timeout(getTimeoutInMillis(), TimeUnit.MILLISECONDS)
.doOnSuccess(t -> timeLimiter.onSuccess())
.doOnComplete(timeLimiter::onSuccess)
.doOnError(timeLimiter::onError);
}

private long getTimeoutInMillis() {
return timeLimiter.getTimeLimiterConfig()
.getTimeoutDuration()
.toMillis();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
*
* Copyright 2019 authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
@NonNullApi
@NonNullFields
package io.github.resilience4j.timelimiter.transformer;

import io.github.resilience4j.core.lang.NonNullApi;
import io.github.resilience4j.core.lang.NonNullFields;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.github.resilience4j;

import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.TestScheduler;

import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

public class TestSchedulerRule implements TestRule {

private final TestScheduler testScheduler = new TestScheduler();

public TestScheduler getTestScheduler() {
return testScheduler;
}

@Override
public Statement apply(final Statement statement, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaPlugins.setIoSchedulerHandler(scheduler -> testScheduler);
RxJavaPlugins.setComputationSchedulerHandler(scheduler -> testScheduler);
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> testScheduler);
try {
statement.evaluate();
} finally {
RxJavaPlugins.reset();
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2019 authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.resilience4j.timelimiter.transformer;

import io.github.resilience4j.TestSchedulerRule;
import io.github.resilience4j.test.HelloWorldService;
import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import io.reactivex.Completable;
import io.reactivex.Maybe;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Rule;
import org.junit.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;

public class TimeLimiterTransformerCompletableTest {

@Rule
public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();
private final TestScheduler testScheduler = testSchedulerRule.getTestScheduler();
private final TimeLimiter timeLimiter = mock(TimeLimiter.class);
private final HelloWorldService helloWorldService = mock(HelloWorldService.class);

@Test
public void otherError() {
given(helloWorldService.returnHelloWorld())
.willThrow(new RuntimeException());
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Completable.fromRunnable(helloWorldService::returnHelloWorld)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();

testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

observer.assertError(RuntimeException.class);
then(timeLimiter).should()
.onError(any(RuntimeException.class));
}

@Test
public void timeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Maybe.timer(1, TimeUnit.MINUTES)
.flatMapCompletable(t -> Completable.complete())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();

testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

observer.assertError(TimeoutException.class);
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}

@Test
public void doNotTimeout() {
given(helloWorldService.returnHelloWorld())
.willReturn("hello");
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestObserver<?> observer = Completable.fromRunnable(helloWorldService::returnHelloWorld)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();

testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

observer.assertComplete();
then(timeLimiter).should()
.onSuccess();
}

private TimeLimiterConfig toConfig(Duration timeout) {
return TimeLimiterConfig.custom()
.timeoutDuration(timeout)
.build();
}

}
Loading

0 comments on commit b8d39d8

Please sign in to comment.