From 1103c9284a8c6975453f109fdf1e6b038d7e8636 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Mon, 20 Nov 2023 12:55:32 +0100 Subject: [PATCH 1/2] delete unused TimeoutWatcher implementation --- .../ScheduledExecutorTimeoutWatcher.java | 30 ------ .../ScheduledExecutorTimeoutWatcherTest.java | 94 ------------------- 2 files changed, 124 deletions(-) delete mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcher.java delete mode 100644 implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcherTest.java diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcher.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcher.java deleted file mode 100644 index bdcf8e7e..00000000 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcher.java +++ /dev/null @@ -1,30 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public class ScheduledExecutorTimeoutWatcher implements TimeoutWatcher { - private final ScheduledExecutorService executor; - - public ScheduledExecutorTimeoutWatcher(ScheduledExecutorService executor) { - this.executor = executor; - } - - @Override - public TimeoutWatch schedule(TimeoutExecution execution) { - ScheduledFuture future = executor.schedule(execution::timeoutAndInterrupt, execution.timeoutInMillis(), - TimeUnit.MILLISECONDS); - return new TimeoutWatch() { - @Override - public boolean isRunning() { - return !future.isDone(); - } - - @Override - public void cancel() { - future.cancel(true); - } - }; - } -} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcherTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcherTest.java deleted file mode 100644 index 7aac2636..00000000 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/ScheduledExecutorTimeoutWatcherTest.java +++ /dev/null @@ -1,94 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledOnOs; -import org.junit.jupiter.api.condition.OS; - -@EnabledOnOs(OS.LINUX) -public class ScheduledExecutorTimeoutWatcherTest { - private ScheduledExecutorService executor; - private ScheduledExecutorTimeoutWatcher watcher; - - @BeforeEach - public void setUp() { - executor = Executors.newSingleThreadScheduledExecutor(); - watcher = new ScheduledExecutorTimeoutWatcher(executor); - } - - @AfterEach - public void tearDown() throws InterruptedException { - executor.shutdownNow(); - executor.awaitTermination(1, TimeUnit.SECONDS); - } - - @Test - public void timedOut() throws InterruptedException { - AtomicBoolean wasInterrupted = new AtomicBoolean(false); - - Thread thread = run(wasInterrupted); - TimeoutExecution execution = new TimeoutExecution(thread, 100L); - TimeoutWatch watch = watcher.schedule(execution); - - assertThat(execution.isRunning()).isTrue(); - assertThat(watch.isRunning()).isTrue(); - - thread.join(); - assertThat(wasInterrupted).isTrue(); - - assertThat(execution.isRunning()).isFalse(); - assertThat(execution.hasFinished()).isFalse(); - assertThat(execution.hasTimedOut()).isTrue(); - - await("watch not running") - .atMost(Duration.ofMillis(100)) - .pollInterval(Duration.ofMillis(50)) - .until(() -> !watch.isRunning()); - } - - @Test - public void notTimedOut() throws InterruptedException { - AtomicBoolean wasInterrupted = new AtomicBoolean(false); - - Thread thread = run(wasInterrupted); - TimeoutExecution execution = new TimeoutExecution(thread, 300L); - TimeoutWatch watch = watcher.schedule(execution); - - assertThat(execution.isRunning()).isTrue(); - assertThat(watch.isRunning()).isTrue(); - - thread.join(); - execution.finish(watch::cancel); // execution.finish() needs to be called explicitly - assertThat(wasInterrupted).isFalse(); - - assertThat(execution.isRunning()).isFalse(); - assertThat(execution.hasFinished()).isTrue(); - assertThat(execution.hasTimedOut()).isFalse(); - await("watch not running") - .atMost(Duration.ofMillis(100)) - .pollInterval(Duration.ofMillis(50)) - .until(() -> !watch.isRunning()); - } - - private Thread run(AtomicBoolean interruptionFlag) { - Thread thread = new Thread(() -> { - try { - Thread.sleep(200L); - } catch (InterruptedException e) { - interruptionFlag.set(true); - } - }); - thread.start(); - return thread; - } -} From 286010079b5337afdfa3cc236873b5ed715631bd Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Mon, 20 Nov 2023 13:40:09 +0100 Subject: [PATCH 2/2] remove the concept of TimeoutWatcher The `TimeoutWatcher` predates and can easily be implemented in terms of the `Timer`. This commit removes the `TimeoutWatcher` concept entirely, because the `Timer` can be used as a replacement rather easily. There's no need to have the extra layer of abstraction. --- .../core/apiimpl/FaultToleranceImpl.java | 5 +- .../core/timeout/CompletionStageTimeout.java | 14 +-- .../faulttolerance/core/timeout/Timeout.java | 12 ++- .../core/timeout/TimeoutWatch.java | 7 -- .../core/timeout/TimeoutWatcher.java | 5 - .../core/timeout/TimerTimeoutWatcher.java | 28 ----- .../timeout/CompletionStageTimeoutTest.java | 84 +++++++-------- .../core/timeout/FutureTimeoutTest.java | 49 ++++----- .../RealWorldCompletionStageTimeoutTest.java | 10 +- .../core/timeout/TestTimeoutWatcher.java | 66 ------------ .../core/timeout/TestTimer.java | 84 +++++++++++++++ .../core/timeout/TimeoutTest.java | 74 ++++++------- .../core/timeout/TimerTimeoutWatcherTest.java | 100 ------------------ .../FaultToleranceInterceptor.java | 10 +- 14 files changed, 212 insertions(+), 336 deletions(-) delete mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatch.java delete mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatcher.java delete mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcher.java delete mode 100644 implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimeoutWatcher.java create mode 100644 implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java delete mode 100644 implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java index f5ec902d..2fba2ee8 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java @@ -53,7 +53,6 @@ import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch; import io.smallrye.faulttolerance.core.timeout.CompletionStageTimeout; import io.smallrye.faulttolerance.core.timeout.Timeout; -import io.smallrye.faulttolerance.core.timeout.TimerTimeoutWatcher; import io.smallrye.faulttolerance.core.util.DirectExecutor; import io.smallrye.faulttolerance.core.util.ExceptionDecision; import io.smallrye.faulttolerance.core.util.Preconditions; @@ -246,7 +245,7 @@ private FaultToleranceStrategy buildSyncStrategy(BuilderLazyDependencies lazy if (lazyDependencies.ftEnabled() && timeoutBuilder != null) { result = new Timeout<>(result, description, timeoutBuilder.durationInMillis, - new TimerTimeoutWatcher(lazyDependencies.timer())); + lazyDependencies.timer()); } if (lazyDependencies.ftEnabled() && rateLimitBuilder != null) { @@ -309,7 +308,7 @@ private FaultToleranceStrategy> buildAsyncStrategy(Builde if (lazyDependencies.ftEnabled() && timeoutBuilder != null) { result = new CompletionStageTimeout<>(result, description, timeoutBuilder.durationInMillis, - new TimerTimeoutWatcher(lazyDependencies.timer())); + lazyDependencies.timer()); } if (lazyDependencies.ftEnabled() && rateLimitBuilder != null) { diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeout.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeout.java index 5195318b..4dfa6f04 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeout.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeout.java @@ -9,11 +9,13 @@ import io.smallrye.faulttolerance.core.FaultToleranceStrategy; import io.smallrye.faulttolerance.core.InvocationContext; +import io.smallrye.faulttolerance.core.timer.Timer; +import io.smallrye.faulttolerance.core.timer.TimerTask; public class CompletionStageTimeout extends Timeout> { public CompletionStageTimeout(FaultToleranceStrategy> delegate, String description, long timeoutInMillis, - TimeoutWatcher watcher) { - super(delegate, description, timeoutInMillis, watcher); + Timer timer) { + super(delegate, description, timeoutInMillis, timer); } @Override @@ -40,8 +42,8 @@ private CompletionStage doApply(InvocationContext> ctx) { } }; - TimeoutExecution timeoutExecution = new TimeoutExecution(null, timeoutInMillis, onTimeout); - TimeoutWatch watch = watcher.schedule(timeoutExecution); + TimeoutExecution execution = new TimeoutExecution(null, timeoutInMillis, onTimeout); + TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt); CompletionStage originalResult; try { @@ -55,9 +57,9 @@ private CompletionStage doApply(InvocationContext> ctx) { // // this comes first, so that when the future is completed, the timeout watcher is already cancelled // (this isn't exactly needed, but makes tests easier to write) - timeoutExecution.finish(watch::cancel); + execution.finish(task::cancel); - if (timeoutExecution.hasTimedOut()) { + if (execution.hasTimedOut()) { onTimeout.run(); } else if (exception != null) { ctx.fireEvent(TimeoutEvents.Finished.NORMALLY); diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java index 9c8f204a..d72eb2fd 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java @@ -8,19 +8,21 @@ import io.smallrye.faulttolerance.core.FaultToleranceStrategy; import io.smallrye.faulttolerance.core.InvocationContext; +import io.smallrye.faulttolerance.core.timer.Timer; +import io.smallrye.faulttolerance.core.timer.TimerTask; public class Timeout implements FaultToleranceStrategy { final FaultToleranceStrategy delegate; final String description; final long timeoutInMillis; - final TimeoutWatcher watcher; + final Timer timer; - public Timeout(FaultToleranceStrategy delegate, String description, long timeoutInMillis, TimeoutWatcher watcher) { + public Timeout(FaultToleranceStrategy delegate, String description, long timeoutInMillis, Timer timer) { this.delegate = checkNotNull(delegate, "Timeout delegate must be set"); this.description = checkNotNull(description, "Timeout description must be set"); this.timeoutInMillis = check(timeoutInMillis, timeoutInMillis > 0, "Timeout must be > 0"); - this.watcher = checkNotNull(watcher, "Timeout watcher must be set"); + this.timer = checkNotNull(timer, "Timer must be set"); } @Override @@ -45,7 +47,7 @@ private V doApply(InvocationContext ctx) throws Exception { notification.accept(timeoutException(description)); } }); - TimeoutWatch watch = watcher.schedule(execution); + TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt); ctx.fireEvent(TimeoutEvents.Started.INSTANCE); V result = null; @@ -59,7 +61,7 @@ private V doApply(InvocationContext ctx) throws Exception { exception = e; } finally { // if the execution already timed out, this will be a noop - execution.finish(watch::cancel); + execution.finish(task::cancel); } if (Thread.interrupted()) { diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatch.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatch.java deleted file mode 100644 index 3659761a..00000000 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatch.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -interface TimeoutWatch { - boolean isRunning(); - - void cancel(); -} diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatcher.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatcher.java deleted file mode 100644 index d5163987..00000000 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutWatcher.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -public interface TimeoutWatcher { - TimeoutWatch schedule(TimeoutExecution execution); -} diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcher.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcher.java deleted file mode 100644 index 2418f9ce..00000000 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcher.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -import io.smallrye.faulttolerance.core.timer.Timer; -import io.smallrye.faulttolerance.core.timer.TimerTask; - -public class TimerTimeoutWatcher implements TimeoutWatcher { - private final Timer timer; - - public TimerTimeoutWatcher(Timer timer) { - this.timer = timer; - } - - @Override - public TimeoutWatch schedule(TimeoutExecution execution) { - TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt); - return new TimeoutWatch() { - @Override - public boolean isRunning() { - return !task.isDone(); - } - - @Override - public void cancel() { - task.cancel(); - } - }; - } -} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeoutTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeoutTest.java index 117bf555..a5a459f9 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeoutTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/CompletionStageTimeoutTest.java @@ -23,19 +23,19 @@ import io.smallrye.faulttolerance.core.util.party.Party; public class CompletionStageTimeoutTest { - private Barrier watcherTimeoutElapsedBarrier; - private Barrier watcherExecutionInterruptedBarrier; + private Barrier timerElapsedBarrier; + private Barrier timerTaskFinishedBarrier; - private TestTimeoutWatcher timeoutWatcher; + private TestTimer timer; private TestExecutor executor; @BeforeEach public void setUp() { - watcherTimeoutElapsedBarrier = Barrier.interruptible(); - watcherExecutionInterruptedBarrier = Barrier.interruptible(); + timerElapsedBarrier = Barrier.interruptible(); + timerTaskFinishedBarrier = Barrier.interruptible(); - timeoutWatcher = new TestTimeoutWatcher(watcherTimeoutElapsedBarrier, watcherExecutionInterruptedBarrier); + timer = new TestTimer(timerElapsedBarrier, timerTaskFinishedBarrier); executor = new TestExecutor(); } @@ -43,14 +43,14 @@ public void setUp() { @AfterEach public void tearDown() throws InterruptedException { executor.shutdown(); - timeoutWatcher.shutdown(); + timer.shutdown(); } @Test public void negativeTimeout() { TestInvocation> invocation = TestInvocation.of(() -> completedFuture("foobar")); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); - assertThatThrownBy(() -> new CompletionStageTimeout<>(execution, "test invocation", -1, timeoutWatcher)) + assertThatThrownBy(() -> new CompletionStageTimeout<>(execution, "test invocation", -1, timer)) .isExactlyInstanceOf(IllegalArgumentException.class); } @@ -58,7 +58,7 @@ public void negativeTimeout() { public void zeroTimeout() { TestInvocation> invocation = TestInvocation.of(() -> completedFuture("foobar")); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); - assertThatThrownBy(() -> new CompletionStageTimeout<>(execution, "test invocation", 0, timeoutWatcher)) + assertThatThrownBy(() -> new CompletionStageTimeout<>(execution, "test invocation", 0, timer)) .isExactlyInstanceOf(IllegalArgumentException.class); } @@ -67,10 +67,10 @@ public void immediatelyReturning_value() throws Exception { TestInvocation> invocation = TestInvocation.of(() -> completedFuture("foobar")); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); assertThat(result.toCompletableFuture().get()).isEqualTo("foobar"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -78,12 +78,12 @@ public void immediatelyReturning_directException() { TestInvocation> invocation = TestInvocation.of(TestException::doThrow); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TestException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -91,12 +91,12 @@ public void immediatelyReturning_completionStageException() { TestInvocation> invocation = TestInvocation.of(() -> failedFuture(new TestException())); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TestException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -109,11 +109,11 @@ public void delayed_value_notTimedOut() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); delayBarrier.open(); assertThat(result.toCompletableFuture().get()).isEqualTo("foobar"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -126,15 +126,15 @@ public void delayed_value_timedOut() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TimeoutException.class) .hasMessageContaining("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -147,16 +147,16 @@ public void delayed_value_timedOutNoninterruptibly() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); delayBarrier.open(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TimeoutException.class) .hasMessageContaining("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -169,14 +169,14 @@ public void delayed_value_interruptedEarly() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); party.organizer().waitForAll(); executor.interruptExecutingThread(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(InterruptedException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -189,13 +189,13 @@ public void delayed_exception_notTimedOut() { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); delayBarrier.open(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TestException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -208,15 +208,15 @@ public void delayed_exception_timedOut() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TimeoutException.class) .hasMessageContaining("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -229,16 +229,16 @@ public void delayed_exception_timedOutNoninterruptibly() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); delayBarrier.open(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TimeoutException.class) .hasMessageContaining("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -251,14 +251,14 @@ public void delayed_exception_interruptedEarly() throws Exception { }); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); party.organizer().waitForAll(); executor.interruptExecutingThread(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(InterruptedException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -276,10 +276,10 @@ public void immediatelyReturning_completionStageTimedOut() throws Exception { })); CompletionStageExecution execution = new CompletionStageExecution<>(invocation, executor); CompletionStageTimeout timeout = new CompletionStageTimeout<>(execution, - "test invocation", 1000, timeoutWatcher); + "test invocation", 1000, timer); CompletionStage result = timeout.apply(new InvocationContext<>(null)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(result.toCompletableFuture()::get) .isExactlyInstanceOf(ExecutionException.class) .hasCauseExactlyInstanceOf(TimeoutException.class); diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/FutureTimeoutTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/FutureTimeoutTest.java index 3a43d561..c01889c4 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/FutureTimeoutTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/FutureTimeoutTest.java @@ -27,18 +27,19 @@ import io.smallrye.faulttolerance.core.util.party.Party; public class FutureTimeoutTest { - private Barrier watcherTimeoutElapsedBarrier; - private Barrier watcherExecutionInterruptedBarrier; + private Barrier timerElapsedBarrier; + private Barrier timerTaskFinishedBarrier; + + private TestTimer timer; - private TestTimeoutWatcher timeoutWatcher; private ExecutorService asyncExecutor; @BeforeEach public void setUp() { - watcherTimeoutElapsedBarrier = Barrier.interruptible(); - watcherExecutionInterruptedBarrier = Barrier.interruptible(); + timerElapsedBarrier = Barrier.interruptible(); + timerTaskFinishedBarrier = Barrier.interruptible(); - timeoutWatcher = new TestTimeoutWatcher(watcherTimeoutElapsedBarrier, watcherExecutionInterruptedBarrier); + timer = new TestTimer(timerElapsedBarrier, timerTaskFinishedBarrier); asyncExecutor = Executors.newFixedThreadPool(4); } @@ -48,14 +49,14 @@ public void tearDown() throws InterruptedException { asyncExecutor.shutdown(); asyncExecutor.awaitTermination(10, TimeUnit.SECONDS); - timeoutWatcher.shutdown(); + timer.shutdown(); } @Test public void failOnLackOfExecutor() { TestInvocation> invocation = TestInvocation.of(() -> completedFuture("foobar")); Timeout> timeout = new Timeout<>(invocation, "test invocation", 1000, - timeoutWatcher); + timer); assertThatThrownBy(() -> new AsyncTimeout<>(timeout, null)) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage("Executor must be set"); @@ -66,14 +67,14 @@ public void immediatelyReturning_value() throws Exception { TestThread> testThread = runAsyncTimeoutImmediately(() -> completedFuture("foobar")); Future future = testThread.await(); assertThat(future.get()).isEqualTo("foobar"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test public void immediatelyReturning_exception() { TestThread> testThread = runAsyncTimeoutImmediately(TestException::doThrow); assertThatThrownBy(testThread::await).isExactlyInstanceOf(TestException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -85,14 +86,14 @@ public void delayed_value_notTimedOut() throws Exception { return completedFuture("foobar"); }); Timeout> timeout = new Timeout<>(invocation, "test invocation", 1000, - timeoutWatcher); + timer); TestThread> testThread = runOnTestThread(new AsyncTimeout<>(timeout, asyncExecutor)); delayBarrier.open(); Future future = testThread.await(); assertThat(future.get()).isEqualTo("foobar"); assertThat(future.isDone()).isEqualTo(true); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -104,15 +105,15 @@ public void delayed_value_timedOut() throws InterruptedException { return completedFuture("foobar"); }); Timeout> timeout = new Timeout<>(invocation, "test invocation", 1000, - timeoutWatcher); + timer); TestThread> testThread = runOnTestThread(new AsyncTimeout<>(timeout, asyncExecutor)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(testThread::await) .isExactlyInstanceOf(TimeoutException.class) .hasMessage("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -125,15 +126,15 @@ public void delayed_value_timedOutNoninterruptibly() throws InterruptedException }); Timeout> timeout = new Timeout<>(invocation, "test invocation", 1000, - timeoutWatcher); + timer); TestThread> testThread = runOnTestThread(new AsyncTimeout<>(timeout, asyncExecutor)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(testThread::await) .isExactlyInstanceOf(TimeoutException.class) .hasMessage("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); // watcher should not be canceled if it caused the stop + assertThat(timer.timerTaskCancelled()).isFalse(); // watcher should not be canceled if it caused the stop delayBarrier.open(); } @@ -148,7 +149,7 @@ public void delayed_value_cancelled() throws InterruptedException { }); Timeout> timeout = new Timeout<>(invocation, "test invocation", 1000, - timeoutWatcher); + timer); TestThread> testThread = runOnTestThread(new AsyncTimeout<>(timeout, asyncExecutor)); party.organizer().waitForAll(); @@ -156,7 +157,7 @@ public void delayed_value_cancelled() throws InterruptedException { assertThatThrownBy(testThread::await) .isExactlyInstanceOf(InterruptedException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); // watcher should not be canceled if it caused the stop + assertThat(timer.timerTaskCancelled()).isFalse(); // watcher should not be canceled if it caused the stop party.organizer().disband(); // not stricly necessary, but would cause tearDown to wait } @@ -175,7 +176,7 @@ public void delayed_value_selfInterrupted() { delayBarrier.open(); assertThatThrownBy(testThread::await).isExactlyInstanceOf(InterruptedException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -259,7 +260,7 @@ public void delayed_value_getRethrowsError() throws Exception { private TestThread> runAsyncTimeoutImmediately(Callable> action) { TestInvocation> invocation = TestInvocation.of(action); Timeout> timeout = new Timeout<>(invocation, "test invocation", 1000, - timeoutWatcher); + timer); return runOnTestThread(new AsyncTimeout<>(timeout, asyncExecutor)); } } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java index a82ebfce..9296fcfa 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java @@ -46,7 +46,6 @@ public class RealWorldCompletionStageTimeoutTest { private ExecutorService timerExecutor; private Timer timer; - private TimerTimeoutWatcher timerWatcher; private Stopwatch stopwatch = SystemStopwatch.INSTANCE; @@ -56,7 +55,6 @@ public void setUp() { timerExecutor = Executors.newSingleThreadExecutor(); timer = ThreadTimer.create(timerExecutor); - timerWatcher = new TimerTimeoutWatcher(timer); } @AfterEach @@ -75,7 +73,7 @@ public void shouldReturnRightAway() throws Exception { CompletionStageExecution execution = new CompletionStageExecution<>(invocation(), executor); FaultToleranceStrategy> timeout = new CompletionStageTimeout<>(execution, - "completion stage timeout", TIMEOUT, timerWatcher); + "completion stage timeout", TIMEOUT, timer); assertThat(timeout.apply(new InvocationContext<>(() -> { Thread.sleep(SLEEP_TIME); @@ -90,7 +88,7 @@ public void shouldPropagateMethodError() throws Exception { CompletionStageExecution execution = new CompletionStageExecution<>(invocation(), executor); FaultToleranceStrategy> timeout = new CompletionStageTimeout<>(execution, - "completion stage timeout", TIMEOUT, timerWatcher); + "completion stage timeout", TIMEOUT, timer); assertThatThrownBy(timeout.apply(new InvocationContext<>(() -> { Thread.sleep(SLEEP_TIME); @@ -107,7 +105,7 @@ public void shouldPropagateCompletionStageError() throws Exception { CompletionStageExecution execution = new CompletionStageExecution<>(invocation(), executor); FaultToleranceStrategy> timeout = new CompletionStageTimeout<>(execution, - "completion stage timeout", TIMEOUT, timerWatcher); + "completion stage timeout", TIMEOUT, timer); assertThatThrownBy(timeout.apply(new InvocationContext<>(() -> { Thread.sleep(SLEEP_TIME); @@ -124,7 +122,7 @@ public void shouldTimeOut() throws Exception { CompletionStageExecution execution = new CompletionStageExecution<>(invocation(), executor); FaultToleranceStrategy> timeout = new CompletionStageTimeout<>(execution, - "completion stage timeout", SLEEP_TIME, timerWatcher); + "completion stage timeout", SLEEP_TIME, timer); assertThatThrownBy(timeout.apply(new InvocationContext<>(() -> { Thread.sleep(TIMEOUT); diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimeoutWatcher.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimeoutWatcher.java deleted file mode 100644 index 9863e99b..00000000 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimeoutWatcher.java +++ /dev/null @@ -1,66 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -import java.util.concurrent.atomic.AtomicBoolean; - -import io.smallrye.faulttolerance.core.util.barrier.Barrier; - -/** - * Can only be used once; subsequent usages will throw an exception during {@code schedule}. - */ -public final class TestTimeoutWatcher implements TimeoutWatcher { - private final AtomicBoolean alreadyUsed = new AtomicBoolean(false); - - private final Barrier timeoutElapsedBarrier; - private final Barrier executionInterruptedBarrier; - - private final AtomicBoolean timeoutWatchCancelled = new AtomicBoolean(false); - - private volatile Thread executingThread; - - public TestTimeoutWatcher(Barrier timeoutElapsedBarrier, Barrier executionInterruptedBarrier) { - this.timeoutElapsedBarrier = timeoutElapsedBarrier; - this.executionInterruptedBarrier = executionInterruptedBarrier; - } - - boolean timeoutWatchWasCancelled() { - return timeoutWatchCancelled.get(); - } - - @Override - public TimeoutWatch schedule(TimeoutExecution execution) { - if (alreadyUsed.compareAndSet(false, true)) { - executingThread = new Thread(() -> { - try { - timeoutElapsedBarrier.await(); - execution.timeoutAndInterrupt(); - executionInterruptedBarrier.open(); - } catch (InterruptedException e) { - // this is expected in case the watched code doesn't timeout (and so the watch is cancelled) - // see also the return value of this method - } - }, "TestTimeoutWatcher thread"); - executingThread.start(); - return new TimeoutWatch() { - @Override - public boolean isRunning() { - return executingThread.isAlive(); - } - - @Override - public void cancel() { - timeoutWatchCancelled.set(true); - executingThread.interrupt(); - } - }; - } else { - throw new IllegalStateException("TestTimeoutWatcher cannot be reused"); - } - } - - public void shutdown() throws InterruptedException { - if (executingThread != null) { - executingThread.interrupt(); - executingThread.join(); - } - } -} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java new file mode 100644 index 00000000..e55e5664 --- /dev/null +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java @@ -0,0 +1,84 @@ +package io.smallrye.faulttolerance.core.timeout; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.smallrye.faulttolerance.core.timer.Timer; +import io.smallrye.faulttolerance.core.timer.TimerTask; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; + +/** + * Can only be used once; subsequent usages will throw an exception during {@code schedule}. + */ +public final class TestTimer implements Timer { + private final AtomicBoolean alreadyUsed = new AtomicBoolean(false); + + private final Barrier timerElapsedBarrier; + private final Barrier timerTaskFinishedBarrier; + + private final AtomicBoolean timerTaskCancelled = new AtomicBoolean(false); + + private volatile Thread executingThread; + + public TestTimer(Barrier timerElapsedBarrier, Barrier timerTaskFinishedBarrier) { + this.timerElapsedBarrier = timerElapsedBarrier; + this.timerTaskFinishedBarrier = timerTaskFinishedBarrier; + } + + boolean timerTaskCancelled() { + return timerTaskCancelled.get(); + } + + @Override + public TimerTask schedule(long delayInMillis, Runnable task) { + if (alreadyUsed.compareAndSet(false, true)) { + executingThread = new Thread(() -> { + try { + timerElapsedBarrier.await(); + task.run(); + timerTaskFinishedBarrier.open(); + } catch (InterruptedException e) { + // this is expected in case the watched code doesn't timeout (and so the watch is cancelled) + // see also the return value of this method + } + }, "TestTimer thread"); + executingThread.start(); + return new TimerTask() { + @Override + public boolean isDone() { + return !executingThread.isAlive(); + } + + @Override + public boolean cancel() { + if (timerTaskCancelled.compareAndSet(false, true)) { + executingThread.interrupt(); + return true; + } + return false; + } + }; + } else { + throw new IllegalStateException("TestTimer cannot be reused"); + } + } + + @Override + public TimerTask schedule(long delayInMillis, Runnable task, Executor executor) { + // not used in `Timeout` / `CompletionStageTimeout` + throw new UnsupportedOperationException(); + } + + @Override + public int countScheduledTasks() { + // not used in `Timeout` / `CompletionStageTimeout` + throw new UnsupportedOperationException(); + } + + public void shutdown() throws InterruptedException { + if (executingThread != null) { + executingThread.interrupt(); + executingThread.join(); + } + } +} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutTest.java index 1839cce8..be0b74c9 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutTest.java @@ -16,35 +16,35 @@ import io.smallrye.faulttolerance.core.util.party.Party; public class TimeoutTest { - private Barrier watcherTimeoutElapsedBarrier; - private Barrier watcherExecutionInterruptedBarrier; + private Barrier timerElapsedBarrier; + private Barrier timerTaskFinishedBarrier; - private TestTimeoutWatcher timeoutWatcher; + private TestTimer timer; @BeforeEach public void setUp() { - watcherTimeoutElapsedBarrier = Barrier.interruptible(); - watcherExecutionInterruptedBarrier = Barrier.interruptible(); + timerElapsedBarrier = Barrier.interruptible(); + timerTaskFinishedBarrier = Barrier.interruptible(); - timeoutWatcher = new TestTimeoutWatcher(watcherTimeoutElapsedBarrier, watcherExecutionInterruptedBarrier); + timer = new TestTimer(timerElapsedBarrier, timerTaskFinishedBarrier); } @AfterEach public void tearDown() throws InterruptedException { - timeoutWatcher.shutdown(); + timer.shutdown(); } @Test public void negativeTimeout() { TestInvocation invocation = TestInvocation.of(() -> "foobar"); - assertThatThrownBy(() -> new Timeout<>(invocation, "test invocation", -1, timeoutWatcher)) + assertThatThrownBy(() -> new Timeout<>(invocation, "test invocation", -1, timer)) .isExactlyInstanceOf(IllegalArgumentException.class); } @Test public void zeroTimeout() { TestInvocation invocation = TestInvocation.of(() -> "foobar"); - assertThatThrownBy(() -> new Timeout<>(invocation, "test invocation", 0, timeoutWatcher)) + assertThatThrownBy(() -> new Timeout<>(invocation, "test invocation", 0, timer)) .isExactlyInstanceOf(IllegalArgumentException.class); } @@ -52,17 +52,17 @@ public void zeroTimeout() { public void immediatelyReturning_value() throws Exception { TestInvocation invocation = TestInvocation.of(() -> "foobar"); TestThread result = runOnTestThread( - new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); + new Timeout<>(invocation, "test invocation", 1000, timer)); assertThat(result.await()).isEqualTo("foobar"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test public void immediatelyReturning_exception() { TestInvocation invocation = TestInvocation.of(TestException::doThrow); - TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); + TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timer)); assertThatThrownBy(result::await).isExactlyInstanceOf(TestException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -74,10 +74,10 @@ public void delayed_value_notTimedOut() throws Exception { return "foobar"; }); TestThread result = runOnTestThread( - new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); + new Timeout<>(invocation, "test invocation", 1000, timer)); delayBarrier.open(); assertThat(result.await()).isEqualTo("foobar"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -89,13 +89,13 @@ public void delayed_value_timedOut() throws InterruptedException { return "foobar"; }); TestThread result = runOnTestThread( - new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + new Timeout<>(invocation, "test invocation", 1000, timer)); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(result::await) .isExactlyInstanceOf(TimeoutException.class) .hasMessage("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -107,14 +107,14 @@ public void delayed_value_timedOutNoninterruptibly() throws InterruptedException return "foobar"; }); TestThread result = runOnTestThread( - new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + new Timeout<>(invocation, "test invocation", 1000, timer)); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); delayBarrier.open(); assertThatThrownBy(result::await) .isExactlyInstanceOf(TimeoutException.class) .hasMessage("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -126,11 +126,11 @@ public void delayed_value_interruptedEarly() throws InterruptedException { return "foobar"; }); TestThread executingThread = runOnTestThread( - new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); + new Timeout<>(invocation, "test invocation", 1000, timer)); party.organizer().waitForAll(); executingThread.interrupt(); assertThatThrownBy(executingThread::await).isExactlyInstanceOf(InterruptedException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -141,10 +141,10 @@ public void delayed_exception_notTimedOut() { delayBarrier.await(); throw new TestException(); }); - TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); + TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timer)); delayBarrier.open(); assertThatThrownBy(result::await).isExactlyInstanceOf(TestException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } @Test @@ -155,13 +155,13 @@ public void delayed_exception_timedOut() throws InterruptedException { delayBarrier.await(); throw new TestException(); }); - TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timer)); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); assertThatThrownBy(result::await) .isExactlyInstanceOf(TimeoutException.class) .hasMessage("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -172,14 +172,14 @@ public void delayed_exception_timedOutNoninterruptibly() throws InterruptedExcep delayBarrier.await(); throw new TestException(); }); - TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); - watcherTimeoutElapsedBarrier.open(); - watcherExecutionInterruptedBarrier.await(); + TestThread result = runOnTestThread(new Timeout<>(invocation, "test invocation", 1000, timer)); + timerElapsedBarrier.open(); + timerTaskFinishedBarrier.await(); delayBarrier.open(); assertThatThrownBy(result::await) .isExactlyInstanceOf(TimeoutException.class) .hasMessage("test invocation timed out"); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isFalse(); + assertThat(timer.timerTaskCancelled()).isFalse(); } @Test @@ -191,10 +191,10 @@ public void delayed_exception_interruptedEarly() throws InterruptedException { throw new TestException(); }); TestThread executingThread = runOnTestThread( - new Timeout<>(invocation, "test invocation", 1000, timeoutWatcher)); + new Timeout<>(invocation, "test invocation", 1000, timer)); party.organizer().waitForAll(); executingThread.interrupt(); assertThatThrownBy(executingThread::await).isExactlyInstanceOf(InterruptedException.class); - assertThat(timeoutWatcher.timeoutWatchWasCancelled()).isTrue(); + assertThat(timer.timerTaskCancelled()).isTrue(); } } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java deleted file mode 100644 index e3ce8118..00000000 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimerTimeoutWatcherTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.smallrye.faulttolerance.core.timeout; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledOnOs; -import org.junit.jupiter.api.condition.OS; - -import io.smallrye.faulttolerance.core.timer.ThreadTimer; -import io.smallrye.faulttolerance.core.timer.Timer; - -@EnabledOnOs(OS.LINUX) -public class TimerTimeoutWatcherTest { - private ExecutorService executor; - private Timer timer; - private TimerTimeoutWatcher watcher; - - @BeforeEach - public void setUp() { - executor = Executors.newSingleThreadExecutor(); - timer = ThreadTimer.create(executor); - watcher = new TimerTimeoutWatcher(timer); - } - - @AfterEach - public void tearDown() throws InterruptedException { - timer.shutdown(); - executor.shutdownNow(); - executor.awaitTermination(1, TimeUnit.SECONDS); - } - - @Test - public void timedOut() throws InterruptedException { - AtomicBoolean wasInterrupted = new AtomicBoolean(false); - - Thread thread = run(wasInterrupted); - TimeoutExecution execution = new TimeoutExecution(thread, 100L); - TimeoutWatch watch = watcher.schedule(execution); - - assertThat(execution.isRunning()).isTrue(); - assertThat(watch.isRunning()).isTrue(); - - thread.join(); - assertThat(wasInterrupted).isTrue(); - - assertThat(execution.isRunning()).isFalse(); - assertThat(execution.hasFinished()).isFalse(); - assertThat(execution.hasTimedOut()).isTrue(); - - await("watch not running") - .atMost(Duration.ofMillis(100)) - .pollInterval(Duration.ofMillis(50)) - .until(() -> !watch.isRunning()); - } - - @Test - public void notTimedOut() throws InterruptedException { - AtomicBoolean wasInterrupted = new AtomicBoolean(false); - - Thread thread = run(wasInterrupted); - TimeoutExecution execution = new TimeoutExecution(thread, 300L); - TimeoutWatch watch = watcher.schedule(execution); - - assertThat(execution.isRunning()).isTrue(); - assertThat(watch.isRunning()).isTrue(); - - thread.join(); - execution.finish(watch::cancel); // execution.finish() needs to be called explicitly - assertThat(wasInterrupted).isFalse(); - - assertThat(execution.isRunning()).isFalse(); - assertThat(execution.hasFinished()).isTrue(); - assertThat(execution.hasTimedOut()).isFalse(); - await("watch not running") - .atMost(Duration.ofMillis(100)) - .pollInterval(Duration.ofMillis(50)) - .until(() -> !watch.isRunning()); - } - - private Thread run(AtomicBoolean interruptionFlag) { - Thread thread = new Thread(() -> { - try { - Thread.sleep(200L); - } catch (InterruptedException e) { - interruptionFlag.set(true); - } - }); - thread.start(); - return thread; - } -} diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceInterceptor.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceInterceptor.java index 0f92f485..332408f9 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceInterceptor.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceInterceptor.java @@ -90,7 +90,6 @@ import io.smallrye.faulttolerance.core.timeout.AsyncTimeout; import io.smallrye.faulttolerance.core.timeout.CompletionStageTimeout; import io.smallrye.faulttolerance.core.timeout.Timeout; -import io.smallrye.faulttolerance.core.timeout.TimerTimeoutWatcher; import io.smallrye.faulttolerance.core.timer.Timer; import io.smallrye.faulttolerance.core.util.DirectExecutor; import io.smallrye.faulttolerance.core.util.ExceptionDecision; @@ -298,8 +297,7 @@ private FaultToleranceStrategy> prepareAsyncStrategy(Faul if (operation.hasTimeout()) { long timeoutMs = getTimeInMs(operation.getTimeout().value(), operation.getTimeout().unit()); - result = new CompletionStageTimeout<>(result, point.toString(), timeoutMs, - new TimerTimeoutWatcher(timer)); + result = new CompletionStageTimeout<>(result, point.toString(), timeoutMs, timer); } if (operation.hasRateLimit()) { @@ -369,8 +367,7 @@ private FaultToleranceStrategy prepareSyncStrategy(FaultToleranceOperatio if (operation.hasTimeout()) { long timeoutMs = getTimeInMs(operation.getTimeout().value(), operation.getTimeout().unit()); - result = new Timeout<>(result, point.toString(), timeoutMs, - new TimerTimeoutWatcher(timer)); + result = new Timeout<>(result, point.toString(), timeoutMs, timer); } if (operation.hasRateLimit()) { @@ -441,8 +438,7 @@ private FaultToleranceStrategy> prepareFutureStrategy(FaultToleran if (operation.hasTimeout()) { long timeoutMs = getTimeInMs(operation.getTimeout().value(), operation.getTimeout().unit()); - Timeout> timeout = new Timeout<>(result, point.toString(), timeoutMs, - new TimerTimeoutWatcher(timer)); + Timeout> timeout = new Timeout<>(result, point.toString(), timeoutMs, timer); result = new AsyncTimeout<>(timeout, asyncExecutor); }