Skip to content

Commit

Permalink
[Performance #287] Docs for throttle related classes
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaskabc authored and ledsoft committed Sep 13, 2024
1 parent 3a86e8b commit 4b303a5
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 26 deletions.
7 changes: 7 additions & 0 deletions src/main/java/cz/cvut/kbss/termit/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cz.cvut.kbss.termit.util;

import cz.cvut.kbss.jopa.vocabulary.SKOS;
import cz.cvut.kbss.termit.util.throttle.ThrottleAspect;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;

Expand Down Expand Up @@ -57,6 +58,12 @@ public class Constants {
*/
public static final Duration THROTTLE_THRESHOLD = Duration.ofSeconds(10);

/**
* After how much time, should complete futures be discarded.
* @see ThrottleAspect#clearOldFutures()
*/
public static final Duration THROTTLE_DISCARD_THRESHOLD = Duration.ofMinutes(1);

/**
* The amount of millis used as timeout for async REST tasks (REST controllers returning Callable):
* 5 minutes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
package cz.cvut.kbss.termit.util.longrunning;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Instant;
import java.util.Optional;

/**
* An asynchronously running task that is expected to run for some time.
*/
public interface LongRunningTask {

/**
* @return true when the task is being actively executed, false otherwise.
*/
boolean isRunning();

/**
* @return true when the task has finished, false otherwise.
* Returns true regardless of whether the task succeeded.
*/
boolean isCompleted();

@Nullable
Instant runningSince();
/**
* @return a timestamp of the task execution start,
* or empty if the task execution has not yet started.
*/
@NotNull
Optional<Instant> runningSince();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,17 @@
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.concurrent.ConcurrentSkipListSet;

/**
* An object that will schedule a long-running tasks
* @see LongRunningTask
*/
public interface LongRunningTaskRegister {

/**
* @return pending and currently running tasks
*/
@NotNull
Collection<LongRunningTask> getTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import java.util.concurrent.Executor;

/**
* Executes the runnable in a transaction
* Executes the runnable in a transaction synchronously.
*
* @see Transactional
*/
@Component
public class TransactionExecutor implements Executor {
public class SynchronousTransactionExecutor implements Executor {

@Transactional
@Override
Expand Down
77 changes: 59 additions & 18 deletions src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import cz.cvut.kbss.termit.TermItApplication;
import cz.cvut.kbss.termit.exception.TermItException;
import cz.cvut.kbss.termit.exception.ThrottleAspectException;
import cz.cvut.kbss.termit.util.Constants;
import cz.cvut.kbss.termit.util.Pair;
import cz.cvut.kbss.termit.util.longrunning.LongRunningTask;
import cz.cvut.kbss.termit.util.longrunning.LongRunningTaskRegister;
Expand Down Expand Up @@ -33,6 +34,7 @@
import org.springframework.transaction.annotation.Transactional;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -45,17 +47,18 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static cz.cvut.kbss.termit.util.Constants.THROTTLE_DISCARD_THRESHOLD;
import static cz.cvut.kbss.termit.util.Constants.THROTTLE_THRESHOLD;
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_SINGLETON;

/**
* @see Throttle
* @implNote The aspect is configured in {@code spring-aop.xml}, this uses Spring AOP instead of AspectJ.
*/
@Order
Expand All @@ -67,44 +70,67 @@ public class ThrottleAspect implements LongRunningTaskRegister {
private static final Logger LOG = LoggerFactory.getLogger(ThrottleAspect.class);

/**
* group, identifier -> future
* <p>Throttled futures are returned as results of method calls.</p>
* <p>Tasks inside them can be replaced by a newer ones allowing
* to merge multiple (throttled) method calls into a single one while always executing the newest one possible.</p>
* <p>A task inside a throttled future represents
* a heavy/long-running task acquired from the body of an throttled method</p>
*
* @implSpec Synchronize in the field definition order before modification
* @implSpec Synchronize in the field declaration order before modification
*/
private final Map<Identifier, ThrottledFuture<Object>> throttledFutures;
private final Map<@NotNull Identifier, @NotNull ThrottledFuture<Object>> throttledFutures;

/**
* @implSpec Synchronize in the field definition order before modification
* The last run is updated every time a task is finished.
* @implSpec Synchronize in the field declaration order before modification
*/
private final Map<Identifier, Instant> lastRun;
private final Map<@NotNull Identifier, @NotNull Instant> lastRun;

/**
* group, identifier -> future
* Scheduled futures are returned from {@link #taskScheduler}.
* Futures are completed by execution of tasks created in {@link #createRunnableToSchedule}.
*
* @implSpec Synchronize in the field definition order before modification
* @implSpec Synchronize in the field declaration order before modification
*/
private final NavigableMap<Identifier, Future<Object>> scheduledFutures;
private final NavigableMap<Identifier, @NotNull Future<Object>> scheduledFutures;

/**
* thread safe set holding identifiers of threads
* currently executing a throttled task
* Thread safe set holding identifiers of threads that are
* currently executing a throttled task.
*/
private final Set<Long> throttledThreads = ConcurrentHashMap.newKeySet();

/**
* Parser for Spring Expression Language
*/
private final ExpressionParser parser = new SpelExpressionParser();

private final TaskScheduler taskScheduler;

/**
* A base context for evaluation of SpEL expressions
*/
private final StandardEvaluationContext standardEvaluationContext;

/**
* Used for acquiring {@link #lastRun} timestamps.
* @implNote for testing purposes
*/
private final Clock clock;

private final TransactionExecutor transactionExecutor;
/**
* Wrapper for executions in a transaction context
*/
private final SynchronousTransactionExecutor transactionExecutor;

/**
* A timestamp of the last time maps were cleaned.
* @see #clearOldFutures()
*/
private final @NotNull AtomicReference<Instant> lastClear;

@Autowired
public ThrottleAspect(@Qualifier("threadPoolTaskScheduler") TaskScheduler taskScheduler, TransactionExecutor transactionExecutor) {
public ThrottleAspect(@Qualifier("threadPoolTaskScheduler") TaskScheduler taskScheduler, SynchronousTransactionExecutor transactionExecutor) {
this.taskScheduler = taskScheduler;
this.transactionExecutor = transactionExecutor;
throttledFutures = new HashMap<>();
Expand All @@ -118,7 +144,7 @@ public ThrottleAspect(@Qualifier("threadPoolTaskScheduler") TaskScheduler taskSc
protected ThrottleAspect(Map<Identifier, ThrottledFuture<Object>> throttledFutures,
Map<Identifier, Instant> lastRun,
NavigableMap<Identifier, Future<Object>> scheduledFutures, TaskScheduler taskScheduler,
Clock clock, TransactionExecutor transactionExecutor) {
Clock clock, SynchronousTransactionExecutor transactionExecutor) {
this.throttledFutures = throttledFutures;
this.lastRun = lastRun;
this.scheduledFutures = scheduledFutures;
Expand Down Expand Up @@ -346,23 +372,28 @@ private Runnable createRunnableToSchedule(ThrottledFuture<?> throttledFuture, Id
};
}

/**
* Discards futures from {@link #throttledFutures}, {@link #lastRun} and {@link #scheduledFutures} maps.
* <p>Every completed future for which a {@link Constants#THROTTLE_DISCARD_THRESHOLD} expired is discarded.</p>
* @see #isThresholdExpired(Identifier)
*/
private void clearOldFutures() {
// if the last clear was performed less than a threshold ago, skip it for now
Instant last = lastClear.get();
if (last.isAfter(Instant.now(clock).minus(THROTTLE_THRESHOLD))) {
if (last.isAfter(Instant.now(clock).minus(THROTTLE_THRESHOLD).minus(THROTTLE_DISCARD_THRESHOLD))) {
return;
}
if (!lastClear.compareAndSet(last, Instant.now(clock))) {
return;
}
synchronized (throttledFutures) {
synchronized (throttledFutures) { // synchronize in the filed declaration order
synchronized (lastRun) {
synchronized (scheduledFutures) {
Stream.of(throttledFutures.keySet().stream(), scheduledFutures.keySet().stream(), lastRun.keySet()
.stream())
.flatMap(s -> s).distinct().toList() // ensures safe modification of maps
.forEach(identifier -> {
if (isThresholdExpired(identifier)) {
if (isThresholdExpiredByMoreThan(identifier, THROTTLE_DISCARD_THRESHOLD)) {
Optional.ofNullable(throttledFutures.get(identifier)).ifPresent(throttled -> {
if (throttled.isDone() || throttled.isCancelled()) {
throttledFutures.remove(identifier);
Expand All @@ -381,6 +412,16 @@ private void clearOldFutures() {
}
}

/**
* @param identifier of the task
* @param duration to add to the throttle threshold
* @return Whether the last time when a task with specified {@code identifier} run
* is older than ({@link Constants#THROTTLE_THRESHOLD} + {@code duration})
*/
private boolean isThresholdExpiredByMoreThan(Identifier identifier, Duration duration) {
return lastRun.getOrDefault(identifier, Instant.MAX).isBefore(Instant.now(clock).minus(THROTTLE_THRESHOLD).minus(duration));
}

/**
* @return Whether the time when the identifier last run is older than the threshold,
* true when the task had never run
Expand All @@ -406,7 +447,7 @@ private void cancelWithHigherGroup(Identifier throttleAnnotation) {
if (throttleAnnotation.getGroup().isBlank()) {
return;
}
synchronized (throttledFutures) {
synchronized (throttledFutures) { // synchronize in the filed declaration order
synchronized (scheduledFutures) {
// look for any futures with higher group
// cancel them and remove from maps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public boolean isCompleted() {
}

@Override
public @Nullable Instant runningSince() {
return completingSince;
public @NotNull Optional<Instant> runningSince() {
return Optional.ofNullable(completingSince);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ThrottleAspectTest {

TaskScheduler taskScheduler;

TransactionExecutor transactionExecutor;
SynchronousTransactionExecutor transactionExecutor;

OrderedMap<Runnable, Instant> taskSchedulerTasks;

Expand Down Expand Up @@ -141,7 +141,7 @@ void beforeEach() throws Throwable {
Clock mockedClock = mock(Clock.class);
when(mockedClock.instant()).then(invocation -> getInstant());

transactionExecutor = mock(TransactionExecutor.class);
transactionExecutor = mock(SynchronousTransactionExecutor.class);
doAnswer(invocation -> {
invocation.getArgument(0, Runnable.class).run();
return null;
Expand Down

0 comments on commit 4b303a5

Please sign in to comment.