From 4b303a57728021fb59e4f0395780d0368a40114a Mon Sep 17 00:00:00 2001 From: lukaskabc Date: Thu, 5 Sep 2024 15:01:49 +0200 Subject: [PATCH] [Performance #287] Docs for throttle related classes --- .../cz/cvut/kbss/termit/util/Constants.java | 7 ++ .../util/longrunning/LongRunningTask.java | 21 ++++- .../longrunning/LongRunningTaskRegister.java | 9 +++ ...va => SynchronousTransactionExecutor.java} | 4 +- .../termit/util/throttle/ThrottleAspect.java | 77 ++++++++++++++----- .../termit/util/throttle/ThrottledFuture.java | 4 +- .../util/throttle/ThrottleAspectTest.java | 4 +- 7 files changed, 100 insertions(+), 26 deletions(-) rename src/main/java/cz/cvut/kbss/termit/util/throttle/{TransactionExecutor.java => SynchronousTransactionExecutor.java} (76%) diff --git a/src/main/java/cz/cvut/kbss/termit/util/Constants.java b/src/main/java/cz/cvut/kbss/termit/util/Constants.java index 32f309824..b457870a6 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/Constants.java +++ b/src/main/java/cz/cvut/kbss/termit/util/Constants.java @@ -18,6 +18,7 @@ package cz.cvut.kbss.termit.util; import cz.cvut.kbss.jopa.vocabulary.SKOS; +import cz.cvut.kbss.termit.util.throttle.ThrottleAspect; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; @@ -57,6 +58,12 @@ public class Constants { */ public static final Duration THROTTLE_THRESHOLD = Duration.ofSeconds(10); + /** + * After how much time, should complete futures be discarded. + * @see ThrottleAspect#clearOldFutures() + */ + public static final Duration THROTTLE_DISCARD_THRESHOLD = Duration.ofMinutes(1); + /** * The amount of millis used as timeout for async REST tasks (REST controllers returning Callable): * 5 minutes diff --git a/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTask.java b/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTask.java index 724541dbf..f5c70bba2 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTask.java +++ b/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTask.java @@ -1,14 +1,31 @@ package cz.cvut.kbss.termit.util.longrunning; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.time.Instant; +import java.util.Optional; +/** + * An asynchronously running task that is expected to run for some time. + */ public interface LongRunningTask { + /** + * @return true when the task is being actively executed, false otherwise. + */ boolean isRunning(); + + /** + * @return true when the task has finished, false otherwise. + * Returns true regardless of whether the task succeeded. + */ boolean isCompleted(); - @Nullable - Instant runningSince(); + /** + * @return a timestamp of the task execution start, + * or empty if the task execution has not yet started. + */ + @NotNull + Optional runningSince(); } diff --git a/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTaskRegister.java b/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTaskRegister.java index 5fcd7f644..0838df503 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTaskRegister.java +++ b/src/main/java/cz/cvut/kbss/termit/util/longrunning/LongRunningTaskRegister.java @@ -3,8 +3,17 @@ import org.jetbrains.annotations.NotNull; import java.util.Collection; +import java.util.concurrent.ConcurrentSkipListSet; +/** + * An object that will schedule a long-running tasks + * @see LongRunningTask + */ public interface LongRunningTaskRegister { + + /** + * @return pending and currently running tasks + */ @NotNull Collection getTasks(); } diff --git a/src/main/java/cz/cvut/kbss/termit/util/throttle/TransactionExecutor.java b/src/main/java/cz/cvut/kbss/termit/util/throttle/SynchronousTransactionExecutor.java similarity index 76% rename from src/main/java/cz/cvut/kbss/termit/util/throttle/TransactionExecutor.java rename to src/main/java/cz/cvut/kbss/termit/util/throttle/SynchronousTransactionExecutor.java index e155db1f4..b6b35dabf 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/throttle/TransactionExecutor.java +++ b/src/main/java/cz/cvut/kbss/termit/util/throttle/SynchronousTransactionExecutor.java @@ -7,12 +7,12 @@ import java.util.concurrent.Executor; /** - * Executes the runnable in a transaction + * Executes the runnable in a transaction synchronously. * * @see Transactional */ @Component -public class TransactionExecutor implements Executor { +public class SynchronousTransactionExecutor implements Executor { @Transactional @Override diff --git a/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java b/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java index 2755d9558..a5eef79e5 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java +++ b/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java @@ -3,6 +3,7 @@ import cz.cvut.kbss.termit.TermItApplication; import cz.cvut.kbss.termit.exception.TermItException; import cz.cvut.kbss.termit.exception.ThrottleAspectException; +import cz.cvut.kbss.termit.util.Constants; import cz.cvut.kbss.termit.util.Pair; import cz.cvut.kbss.termit.util.longrunning.LongRunningTask; import cz.cvut.kbss.termit.util.longrunning.LongRunningTaskRegister; @@ -33,6 +34,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Collection; @@ -45,17 +47,18 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static cz.cvut.kbss.termit.util.Constants.THROTTLE_DISCARD_THRESHOLD; import static cz.cvut.kbss.termit.util.Constants.THROTTLE_THRESHOLD; import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_SINGLETON; /** + * @see Throttle * @implNote The aspect is configured in {@code spring-aop.xml}, this uses Spring AOP instead of AspectJ. */ @Order @@ -67,44 +70,67 @@ public class ThrottleAspect implements LongRunningTaskRegister { private static final Logger LOG = LoggerFactory.getLogger(ThrottleAspect.class); /** - * group, identifier -> future + *

Throttled futures are returned as results of method calls.

+ *

Tasks inside them can be replaced by a newer ones allowing + * to merge multiple (throttled) method calls into a single one while always executing the newest one possible.

+ *

A task inside a throttled future represents + * a heavy/long-running task acquired from the body of an throttled method

* - * @implSpec Synchronize in the field definition order before modification + * @implSpec Synchronize in the field declaration order before modification */ - private final Map> throttledFutures; + private final Map<@NotNull Identifier, @NotNull ThrottledFuture> throttledFutures; /** - * @implSpec Synchronize in the field definition order before modification + * The last run is updated every time a task is finished. + * @implSpec Synchronize in the field declaration order before modification */ - private final Map lastRun; + private final Map<@NotNull Identifier, @NotNull Instant> lastRun; /** - * group, identifier -> future + * Scheduled futures are returned from {@link #taskScheduler}. + * Futures are completed by execution of tasks created in {@link #createRunnableToSchedule}. * - * @implSpec Synchronize in the field definition order before modification + * @implSpec Synchronize in the field declaration order before modification */ - private final NavigableMap> scheduledFutures; + private final NavigableMap> scheduledFutures; /** - * thread safe set holding identifiers of threads - * currently executing a throttled task + * Thread safe set holding identifiers of threads that are + * currently executing a throttled task. */ private final Set throttledThreads = ConcurrentHashMap.newKeySet(); + /** + * Parser for Spring Expression Language + */ private final ExpressionParser parser = new SpelExpressionParser(); private final TaskScheduler taskScheduler; + /** + * A base context for evaluation of SpEL expressions + */ private final StandardEvaluationContext standardEvaluationContext; + /** + * Used for acquiring {@link #lastRun} timestamps. + * @implNote for testing purposes + */ private final Clock clock; - private final TransactionExecutor transactionExecutor; + /** + * Wrapper for executions in a transaction context + */ + private final SynchronousTransactionExecutor transactionExecutor; + /** + * A timestamp of the last time maps were cleaned. + * @see #clearOldFutures() + */ private final @NotNull AtomicReference lastClear; @Autowired - public ThrottleAspect(@Qualifier("threadPoolTaskScheduler") TaskScheduler taskScheduler, TransactionExecutor transactionExecutor) { + public ThrottleAspect(@Qualifier("threadPoolTaskScheduler") TaskScheduler taskScheduler, SynchronousTransactionExecutor transactionExecutor) { this.taskScheduler = taskScheduler; this.transactionExecutor = transactionExecutor; throttledFutures = new HashMap<>(); @@ -118,7 +144,7 @@ public ThrottleAspect(@Qualifier("threadPoolTaskScheduler") TaskScheduler taskSc protected ThrottleAspect(Map> throttledFutures, Map lastRun, NavigableMap> scheduledFutures, TaskScheduler taskScheduler, - Clock clock, TransactionExecutor transactionExecutor) { + Clock clock, SynchronousTransactionExecutor transactionExecutor) { this.throttledFutures = throttledFutures; this.lastRun = lastRun; this.scheduledFutures = scheduledFutures; @@ -346,23 +372,28 @@ private Runnable createRunnableToSchedule(ThrottledFuture throttledFuture, Id }; } + /** + * Discards futures from {@link #throttledFutures}, {@link #lastRun} and {@link #scheduledFutures} maps. + *

Every completed future for which a {@link Constants#THROTTLE_DISCARD_THRESHOLD} expired is discarded.

+ * @see #isThresholdExpired(Identifier) + */ private void clearOldFutures() { // if the last clear was performed less than a threshold ago, skip it for now Instant last = lastClear.get(); - if (last.isAfter(Instant.now(clock).minus(THROTTLE_THRESHOLD))) { + if (last.isAfter(Instant.now(clock).minus(THROTTLE_THRESHOLD).minus(THROTTLE_DISCARD_THRESHOLD))) { return; } if (!lastClear.compareAndSet(last, Instant.now(clock))) { return; } - synchronized (throttledFutures) { + synchronized (throttledFutures) { // synchronize in the filed declaration order synchronized (lastRun) { synchronized (scheduledFutures) { Stream.of(throttledFutures.keySet().stream(), scheduledFutures.keySet().stream(), lastRun.keySet() .stream()) .flatMap(s -> s).distinct().toList() // ensures safe modification of maps .forEach(identifier -> { - if (isThresholdExpired(identifier)) { + if (isThresholdExpiredByMoreThan(identifier, THROTTLE_DISCARD_THRESHOLD)) { Optional.ofNullable(throttledFutures.get(identifier)).ifPresent(throttled -> { if (throttled.isDone() || throttled.isCancelled()) { throttledFutures.remove(identifier); @@ -381,6 +412,16 @@ private void clearOldFutures() { } } + /** + * @param identifier of the task + * @param duration to add to the throttle threshold + * @return Whether the last time when a task with specified {@code identifier} run + * is older than ({@link Constants#THROTTLE_THRESHOLD} + {@code duration}) + */ + private boolean isThresholdExpiredByMoreThan(Identifier identifier, Duration duration) { + return lastRun.getOrDefault(identifier, Instant.MAX).isBefore(Instant.now(clock).minus(THROTTLE_THRESHOLD).minus(duration)); + } + /** * @return Whether the time when the identifier last run is older than the threshold, * true when the task had never run @@ -406,7 +447,7 @@ private void cancelWithHigherGroup(Identifier throttleAnnotation) { if (throttleAnnotation.getGroup().isBlank()) { return; } - synchronized (throttledFutures) { + synchronized (throttledFutures) { // synchronize in the filed declaration order synchronized (scheduledFutures) { // look for any futures with higher group // cancel them and remove from maps diff --git a/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottledFuture.java b/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottledFuture.java index de2f28d8f..297754ac7 100644 --- a/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottledFuture.java +++ b/src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottledFuture.java @@ -190,7 +190,7 @@ public boolean isCompleted() { } @Override - public @Nullable Instant runningSince() { - return completingSince; + public @NotNull Optional runningSince() { + return Optional.ofNullable(completingSince); } } diff --git a/src/test/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspectTest.java b/src/test/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspectTest.java index 5a99e1a82..5931f74db 100644 --- a/src/test/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspectTest.java +++ b/src/test/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspectTest.java @@ -54,7 +54,7 @@ class ThrottleAspectTest { TaskScheduler taskScheduler; - TransactionExecutor transactionExecutor; + SynchronousTransactionExecutor transactionExecutor; OrderedMap taskSchedulerTasks; @@ -141,7 +141,7 @@ void beforeEach() throws Throwable { Clock mockedClock = mock(Clock.class); when(mockedClock.instant()).then(invocation -> getInstant()); - transactionExecutor = mock(TransactionExecutor.class); + transactionExecutor = mock(SynchronousTransactionExecutor.class); doAnswer(invocation -> { invocation.getArgument(0, Runnable.class).run(); return null;