Skip to content

Commit

Permalink
fix AsyncTimeout in presence of retries
Browse files Browse the repository at this point in the history
Previously, `AsyncTimeout` used the `InvocationContext` event
system to communicate with the `Timeout` strategy. This had
2 flaws in presence of retries:

- the set of event handlers grew up with each retry, resulting
  in unnecessary attempts to complete a `Future` that has already
  been completed (silly, but harmless);
- a delayed (due to thread scheduling) timeout event from one retry
  iteration could trigger immediate timeout for subsequent retry
  iteration (big problem!).

With this commit, `AsyncTimeout` and `Timeout` use
the `InvocationContext` contextual data system and don't suffer
from these issues. This is because the `Timeout` class picks up
the contextual data item that's used for communicating with
`AsyncTimeout` synchronously, so that it's always the one belonging
to the "current" retry iteration, and even deletes it immediately
after picking it up.
  • Loading branch information
Ladicek committed May 11, 2021
1 parent 72fff5d commit a8fb87d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public <T> void set(Class<T> clazz, T object) {
data.put(clazz, object);
}

public <T> void remove(Class<T> clazz) {
data.remove(clazz);
}

public <T> T get(Class<T> clazz) {
return clazz.cast(data.get(clazz));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;

/**
* The next strategy in the chain must be {@link Timeout}, and it is invoked on an extra thread.
* Communication then happens using {@link TimeoutEvents.AsyncTimedOut}.
* Communication then happens using {@link AsyncTimeoutNotification}.
* <p>
* Note that the {@code TimeoutException} thrown from this strategy might come from two places:
* the {@code Timeout} strategy throwing, or {@code AsyncTimeoutTask#timedOut} setting an exception
* as a result of {@code TimeoutEvent}. Both might happen, and whichever happens first gets to decide.
* That's why we take extra care to make sure that there's only one place where the exception is created.
* as a result of {@code AsyncTimeoutNotification}. Both might happen, and whichever happens first
* gets to decide.
*/
public class AsyncTimeout<V> implements FaultToleranceStrategy<Future<V>> {
private final FaultToleranceStrategy<Future<V>> delegate;
Expand All @@ -43,11 +45,14 @@ public Future<V> apply(InvocationContext<Future<V>> ctx) throws Exception {

private Future<V> doApply(InvocationContext<Future<V>> ctx) throws Exception {
AsyncTimeoutTask<Future<V>> task = new AsyncTimeoutTask<>(() -> delegate.apply(ctx));
ctx.registerEventHandler(TimeoutEvents.AsyncTimedOut.class, task::timedOut);
LOG.asyncTimeoutTaskCreated(task);
ctx.set(AsyncTimeoutNotification.class, task::timedOut);

executor.execute(task);
try {
return task.get();
} catch (ExecutionException e) {
LOG.asyncTimeoutRethrowing(e.getCause());
throw sneakyThrow(e.getCause());
}
}
Expand All @@ -58,8 +63,9 @@ private static class AsyncTimeoutTask<T> extends FutureTask<T> {
super(callable);
}

public void timedOut(TimeoutEvents.AsyncTimedOut event) {
super.setException(event.timeoutException());
public void timedOut(TimeoutException exception) {
LOG.asyncTimeoutTaskCompleting(this, exception);
super.setException(exception);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.smallrye.faulttolerance.core.timeout;

import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;

@FunctionalInterface
interface AsyncTimeoutNotification {
void accept(TimeoutException timeoutException);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,17 @@ public V apply(InvocationContext<V> ctx) throws Exception {
}

private V doApply(InvocationContext<V> ctx) throws Exception {
TimeoutExecution execution = new TimeoutExecution(Thread.currentThread(), timeoutInMillis,
() -> ctx.fireEvent(new TimeoutEvents.AsyncTimedOut(() -> timeoutException(description))));
// must extract `AsyncTimeoutNotification` synchronously, because if retries are present,
// a different `AsyncTimeoutNotification` may be present in the `InvocationContext`
// by the time the timeout callback is invoked
AsyncTimeoutNotification notification = ctx.get(AsyncTimeoutNotification.class);
ctx.remove(AsyncTimeoutNotification.class);

TimeoutExecution execution = new TimeoutExecution(Thread.currentThread(), timeoutInMillis, () -> {
if (notification != null) {
notification.accept(timeoutException(description));
}
});
TimeoutWatch watch = watcher.schedule(execution);
ctx.fireEvent(TimeoutEvents.Started.INSTANCE);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package io.smallrye.faulttolerance.core.timeout;

import java.util.function.Supplier;

import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;

import io.smallrye.faulttolerance.core.InvocationContextEvent;

public class TimeoutEvents {
Expand All @@ -22,16 +18,4 @@ public enum Finished implements InvocationContextEvent {
this.timedOut = timedOut;
}
}

public static class AsyncTimedOut implements InvocationContextEvent {
private final Supplier<TimeoutException> timeoutException;

AsyncTimedOut(Supplier<TimeoutException> timeoutException) {
this.timeoutException = timeoutException;
}

TimeoutException timeoutException() {
return timeoutException.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
package io.smallrye.faulttolerance.core.timeout;

import static org.jboss.logging.annotations.Message.NONE;

import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
import org.jboss.logging.annotations.Transform;

@MessageLogger(projectCode = "SRFTL", length = 5)
interface TimeoutLogger extends BasicLogger {
TimeoutLogger LOG = Logger.getMessageLogger(TimeoutLogger.class, TimeoutLogger.class.getPackage().getName());

@Message(id = NONE, value = "AsyncTimeoutTask %s created")
@LogMessage(level = Logger.Level.TRACE)
void asyncTimeoutTaskCreated(@Transform(Transform.TransformType.IDENTITY_HASH_CODE) Object task);

@Message(id = NONE, value = "AsyncTimeoutTask %s completing with %s")
@LogMessage(level = Logger.Level.TRACE)
void asyncTimeoutTaskCompleting(@Transform(Transform.TransformType.IDENTITY_HASH_CODE) Object task, TimeoutException e);

@Message(id = NONE, value = "AsyncTimeout rethrowing %s")
@LogMessage(level = Logger.Level.TRACE)
void asyncTimeoutRethrowing(Throwable e);
}

0 comments on commit a8fb87d

Please sign in to comment.