Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concept: delete deprecated scheduler #923

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private CompletionStage<V> doApply(InvocationContext<CompletionStage<V>> ctx) {
};

TimeoutExecution timeoutExecution = new TimeoutExecution(null, timeoutInMillis, onTimeout);
TimeoutWatch watch = watcher.schedule(timeoutExecution);
var watch = watcher.schedule(timeoutExecution);

CompletionStage<V> originalResult;
try {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private V doApply(InvocationContext<V> ctx) throws Exception {
notification.accept(timeoutException(description));
}
});
TimeoutWatch watch = watcher.schedule(execution);
var watch = watcher.schedule(execution);
ctx.fireEvent(TimeoutEvents.Started.INSTANCE);

V result = null;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.faulttolerance.core.timeout;

import io.smallrye.faulttolerance.core.timer.TimerTask;

public interface TimeoutWatcher {
TimeoutWatch schedule(TimeoutExecution execution);
TimerTask schedule(TimeoutExecution execution);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,7 @@ public TimerTimeoutWatcher(Timer timer) {
}

@Override
public TimeoutWatch schedule(TimeoutExecution execution) {
TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt);
return new TimeoutWatch() {
@Override
public boolean isRunning() {
return !task.isDone();
}

@Override
public void cancel() {
task.cancel();
}
};
public TimerTask schedule(TimeoutExecution execution) {
return timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.smallrye.faulttolerance.core.timer.ThreadTimer;
import io.smallrye.faulttolerance.core.timer.Timer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -17,13 +20,15 @@

@EnabledOnOs(OS.LINUX)
public class ScheduledExecutorTimeoutWatcherTest {
private ScheduledExecutorService executor;
private ScheduledExecutorTimeoutWatcher watcher;
private ExecutorService executor;
private Timer timer;
private TimerTimeoutWatcher watcher;

@BeforeEach
public void setUp() {
executor = Executors.newSingleThreadScheduledExecutor();
watcher = new ScheduledExecutorTimeoutWatcher(executor);
executor = Executors.newSingleThreadExecutor();
timer = ThreadTimer.create(executor);
watcher = new TimerTimeoutWatcher(timer);
}

@AfterEach
Expand All @@ -38,10 +43,10 @@ public void timedOut() throws InterruptedException {

Thread thread = run(wasInterrupted);
TimeoutExecution execution = new TimeoutExecution(thread, 100L);
TimeoutWatch watch = watcher.schedule(execution);
var watch = watcher.schedule(execution);

assertThat(execution.isRunning()).isTrue();
assertThat(watch.isRunning()).isTrue();
assertThat(watch.isDone()).isFalse();// isRunning isTrue

thread.join();
assertThat(wasInterrupted).isTrue();
Expand All @@ -53,7 +58,7 @@ public void timedOut() throws InterruptedException {
await("watch not running")
.atMost(Duration.ofMillis(100))
.pollInterval(Duration.ofMillis(50))
.until(() -> !watch.isRunning());
.until(watch::isDone);// !watch.isRunning()
}

@Test
Expand All @@ -62,10 +67,10 @@ public void notTimedOut() throws InterruptedException {

Thread thread = run(wasInterrupted);
TimeoutExecution execution = new TimeoutExecution(thread, 300L);
TimeoutWatch watch = watcher.schedule(execution);
var watch = watcher.schedule(execution);

assertThat(execution.isRunning()).isTrue();
assertThat(watch.isRunning()).isTrue();
assertThat(watch.isDone()).isFalse();

thread.join();
execution.finish(watch::cancel); // execution.finish() needs to be called explicitly
Expand All @@ -77,7 +82,7 @@ public void notTimedOut() throws InterruptedException {
await("watch not running")
.atMost(Duration.ofMillis(100))
.pollInterval(Duration.ofMillis(50))
.until(() -> !watch.isRunning());
.until(watch::isDone);
}

private Thread run(AtomicBoolean interruptionFlag) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.atomic.AtomicBoolean;

import io.smallrye.faulttolerance.core.timer.TimerTask;
import io.smallrye.faulttolerance.core.util.barrier.Barrier;

/**
Expand All @@ -27,7 +28,7 @@ boolean timeoutWatchWasCancelled() {
}

@Override
public TimeoutWatch schedule(TimeoutExecution execution) {
public TimerTask schedule(TimeoutExecution execution) {
if (alreadyUsed.compareAndSet(false, true)) {
executingThread = new Thread(() -> {
try {
Expand All @@ -40,16 +41,16 @@ public TimeoutWatch schedule(TimeoutExecution execution) {
}
}, "TestTimeoutWatcher thread");
executingThread.start();
return new TimeoutWatch() {
@Override
public boolean isRunning() {
return executingThread.isAlive();
return new TimerTask() {
@Override public boolean isDone (){
return !executingThread.isAlive();
}

@Override
public void cancel() {
public boolean cancel() {
timeoutWatchCancelled.set(true);
executingThread.interrupt();
return true;
}
};
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public void timedOut() throws InterruptedException {

Thread thread = run(wasInterrupted);
TimeoutExecution execution = new TimeoutExecution(thread, 100L);
TimeoutWatch watch = watcher.schedule(execution);
var watch = watcher.schedule(execution);

assertThat(execution.isRunning()).isTrue();
assertThat(watch.isRunning()).isTrue();
assertThat(watch.isDone()).isFalse();// isRunning isTrue

thread.join();
assertThat(wasInterrupted).isTrue();
Expand All @@ -59,7 +59,7 @@ public void timedOut() throws InterruptedException {
await("watch not running")
.atMost(Duration.ofMillis(100))
.pollInterval(Duration.ofMillis(50))
.until(() -> !watch.isRunning());
.until(watch::isDone);// !watch.isRunning()
}

@Test
Expand All @@ -68,10 +68,10 @@ public void notTimedOut() throws InterruptedException {

Thread thread = run(wasInterrupted);
TimeoutExecution execution = new TimeoutExecution(thread, 300L);
TimeoutWatch watch = watcher.schedule(execution);
var watch = watcher.schedule(execution);

assertThat(execution.isRunning()).isTrue();
assertThat(watch.isRunning()).isTrue();
assertThat(watch.isDone()).isFalse();

thread.join();
execution.finish(watch::cancel); // execution.finish() needs to be called explicitly
Expand All @@ -83,7 +83,7 @@ public void notTimedOut() throws InterruptedException {
await("watch not running")
.atMost(Duration.ofMillis(100))
.pollInterval(Duration.ofMillis(50))
.until(() -> !watch.isRunning());
.until(watch::isDone);
}

private Thread run(AtomicBoolean interruptionFlag) {
Expand Down