Skip to content

Commit

Permalink
Merge pull request #927 from Ladicek/timeout-improvements
Browse files Browse the repository at this point in the history
timeout improvements
  • Loading branch information
Ladicek authored Nov 20, 2023
2 parents 4c2a5c0 + 2860100 commit 40cf278
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
import io.smallrye.faulttolerance.core.timeout.CompletionStageTimeout;
import io.smallrye.faulttolerance.core.timeout.Timeout;
import io.smallrye.faulttolerance.core.timeout.TimerTimeoutWatcher;
import io.smallrye.faulttolerance.core.util.DirectExecutor;
import io.smallrye.faulttolerance.core.util.ExceptionDecision;
import io.smallrye.faulttolerance.core.util.Preconditions;
Expand Down Expand Up @@ -246,7 +245,7 @@ private FaultToleranceStrategy<T> buildSyncStrategy(BuilderLazyDependencies lazy

if (lazyDependencies.ftEnabled() && timeoutBuilder != null) {
result = new Timeout<>(result, description, timeoutBuilder.durationInMillis,
new TimerTimeoutWatcher(lazyDependencies.timer()));
lazyDependencies.timer());
}

if (lazyDependencies.ftEnabled() && rateLimitBuilder != null) {
Expand Down Expand Up @@ -309,7 +308,7 @@ private <V> FaultToleranceStrategy<CompletionStage<V>> buildAsyncStrategy(Builde

if (lazyDependencies.ftEnabled() && timeoutBuilder != null) {
result = new CompletionStageTimeout<>(result, description, timeoutBuilder.durationInMillis,
new TimerTimeoutWatcher(lazyDependencies.timer()));
lazyDependencies.timer());
}

if (lazyDependencies.ftEnabled() && rateLimitBuilder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.timer.Timer;
import io.smallrye.faulttolerance.core.timer.TimerTask;

public class CompletionStageTimeout<V> extends Timeout<CompletionStage<V>> {
public CompletionStageTimeout(FaultToleranceStrategy<CompletionStage<V>> delegate, String description, long timeoutInMillis,
TimeoutWatcher watcher) {
super(delegate, description, timeoutInMillis, watcher);
Timer timer) {
super(delegate, description, timeoutInMillis, timer);
}

@Override
Expand All @@ -40,8 +42,8 @@ private CompletionStage<V> doApply(InvocationContext<CompletionStage<V>> ctx) {
}
};

TimeoutExecution timeoutExecution = new TimeoutExecution(null, timeoutInMillis, onTimeout);
TimeoutWatch watch = watcher.schedule(timeoutExecution);
TimeoutExecution execution = new TimeoutExecution(null, timeoutInMillis, onTimeout);
TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt);

CompletionStage<V> originalResult;
try {
Expand All @@ -55,9 +57,9 @@ private CompletionStage<V> doApply(InvocationContext<CompletionStage<V>> ctx) {
//
// this comes first, so that when the future is completed, the timeout watcher is already cancelled
// (this isn't exactly needed, but makes tests easier to write)
timeoutExecution.finish(watch::cancel);
execution.finish(task::cancel);

if (timeoutExecution.hasTimedOut()) {
if (execution.hasTimedOut()) {
onTimeout.run();
} else if (exception != null) {
ctx.fireEvent(TimeoutEvents.Finished.NORMALLY);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.timer.Timer;
import io.smallrye.faulttolerance.core.timer.TimerTask;

public class Timeout<V> implements FaultToleranceStrategy<V> {
final FaultToleranceStrategy<V> delegate;
final String description;

final long timeoutInMillis;
final TimeoutWatcher watcher;
final Timer timer;

public Timeout(FaultToleranceStrategy<V> delegate, String description, long timeoutInMillis, TimeoutWatcher watcher) {
public Timeout(FaultToleranceStrategy<V> delegate, String description, long timeoutInMillis, Timer timer) {
this.delegate = checkNotNull(delegate, "Timeout delegate must be set");
this.description = checkNotNull(description, "Timeout description must be set");
this.timeoutInMillis = check(timeoutInMillis, timeoutInMillis > 0, "Timeout must be > 0");
this.watcher = checkNotNull(watcher, "Timeout watcher must be set");
this.timer = checkNotNull(timer, "Timer must be set");
}

@Override
Expand All @@ -45,7 +47,7 @@ private V doApply(InvocationContext<V> ctx) throws Exception {
notification.accept(timeoutException(description));
}
});
TimeoutWatch watch = watcher.schedule(execution);
TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt);
ctx.fireEvent(TimeoutEvents.Started.INSTANCE);

V result = null;
Expand All @@ -59,7 +61,7 @@ private V doApply(InvocationContext<V> ctx) throws Exception {
exception = e;
} finally {
// if the execution already timed out, this will be a noop
execution.finish(watch::cancel);
execution.finish(task::cancel);
}

if (Thread.interrupted()) {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 40cf278

Please sign in to comment.