Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread leak/invasion #5589

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.security.Principal;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -88,6 +91,8 @@ public final class JettyHttpContainer extends Handler.Abstract implements Contai
*/
private boolean configSetStatusOverSendError;

private final ScheduledThreadPoolExecutor timeoutScheduler;

/**
* Referencing factory for Jetty request.
*/
Expand Down Expand Up @@ -136,7 +141,7 @@ protected void configure() {
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception {

final ResponseWriter responseWriter = new ResponseWriter(request, response, callback, configSetStatusOverSendError);
final ResponseWriter responseWriter = new ResponseWriter(timeoutScheduler, request, response, callback, configSetStatusOverSendError);
try {
LOGGER.debugLog(LocalizationMessages.CONTAINER_STARTED());
final URI baseUri = getBaseUri(request);
Expand Down Expand Up @@ -254,32 +259,44 @@ private static final class ResponseWriter implements ContainerResponseWriter {
private final Response response;
private final Callback callback;
private final boolean configSetStatusOverSendError;
private final Timer timer = new Timer();
private final long asyncStartTimeMillis;
private final long asyncStartTimeNanos;
private final ScheduledExecutorService timeoutScheduler;
private final ConcurrentLinkedQueue<TimeoutHandler> timeoutHandlerQueue = new ConcurrentLinkedQueue<>();
private TimerTask currentTimerTask;
private ScheduledFuture<?> currentTimerTask;

ResponseWriter(final Request request, final Response response, final Callback callback,
final boolean configSetStatusOverSendError) {
ResponseWriter(final ScheduledExecutorService timeoutScheduler, final Request request, final Response response,
final Callback callback, final boolean configSetStatusOverSendError) {
this.timeoutScheduler = timeoutScheduler;
this.request = request;
this.response = response;
this.callback = callback;
this.asyncStartTimeMillis = System.currentTimeMillis();
this.asyncStartTimeNanos = System.nanoTime();
this.configSetStatusOverSendError = configSetStatusOverSendError;
}

private synchronized void setNewTimeout(long timeOut, TimeUnit timeUnit) {
long timeOutMillis = timeUnit.toMillis(timeOut);
long timeOutNanos = timeUnit.toNanos(timeOut);
if (currentTimerTask != null) {
currentTimerTask.cancel();
// Do not interrupt, see callTimeoutHandlers()
currentTimerTask.cancel(false);
}
currentTimerTask = new TimerTask() {
@Override
public void run() {
timeoutHandlerQueue.forEach(timeoutHandler -> timeoutHandler.onTimeout(ResponseWriter.this));
// Use System.nanoTime() as the clock source here, because the returned value is not prone to wall-clock
// drift - unlike System.currentTimeMillis().
long delayNanos = Math.max(asyncStartTimeNanos - System.nanoTime() + timeOutNanos, 0L);
currentTimerTask = timeoutScheduler.schedule(this::callTimeoutHandlers, delayNanos, TimeUnit.NANOSECONDS);
}

private void callTimeoutHandlers() {
// Note: Although it might not happen in practice, it is in theory possible that this function is
// called multiple times concurrently. To prevent any timeout handler being called twice, we poll()
// timeout handlers from the queue, instead of iterating over the queue.
while (true) {
TimeoutHandler handler = timeoutHandlerQueue.poll();
if (handler == null) {
break;
}
};
timer.schedule(currentTimerTask, Math.max(0, timeOutMillis + asyncStartTimeMillis - System.currentTimeMillis()));
handler.onTimeout(ResponseWriter.this);
}
}

@Override
Expand Down Expand Up @@ -419,6 +436,41 @@ public void doStop() throws Exception {
super.doStop();
appHandler.onShutdown(this);
appHandler = null;

timeoutScheduler.shutdown();
boolean needInterrupt = false;
while (true) {
try {
if (timeoutScheduler.awaitTermination(1L, TimeUnit.MINUTES)) {
break;
}
} catch (InterruptedException e) {
if (!needInterrupt) {
needInterrupt = true;
timeoutScheduler.shutdownNow();
}
}
}
if (needInterrupt) {
Thread.currentThread().interrupt();
}
}

private static final AtomicInteger TIMEOUT_HANDLER_ID_GEN = new AtomicInteger();

private static ScheduledThreadPoolExecutor createTimeoutScheduler() {
// Note: creating the thread-pool does not start the core-pool threads.
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r, "JettyHttpContainer-Timeout-Handler #" + TIMEOUT_HANDLER_ID_GEN.incrementAndGet());
t.setDaemon(true);
return t;
});
// Limit the number of timeout handling threads to a quarter of the number of CPUs, at least 2.
executor.setMaximumPoolSize(Math.max(2, Runtime.getRuntime().availableProcessors() / 4));
executor.allowCoreThreadTimeOut(true);
// Don't Keep timeout handling threads around "forever".
executor.setKeepAliveTime(100, TimeUnit.MILLISECONDS);
return executor;
}

/**
Expand All @@ -428,6 +480,7 @@ public void doStop() throws Exception {
* @param parentContext DI provider specific context with application's registered bindings.
*/
JettyHttpContainer(final Application application, final Object parentContext) {
this.timeoutScheduler = createTimeoutScheduler();
this.appHandler = new ApplicationHandler(application, new JettyBinder(), parentContext);
}

Expand All @@ -437,6 +490,7 @@ public void doStop() throws Exception {
* @param application JAX-RS / Jersey application to be deployed on Jetty HTTP container.
*/
JettyHttpContainer(final Application application) {
this.timeoutScheduler = createTimeoutScheduler();
this.appHandler = new ApplicationHandler(application, new JettyBinder());

cacheConfigSetStatusOverSendError();
Expand All @@ -448,6 +502,7 @@ public void doStop() throws Exception {
* @param applicationClass JAX-RS / Jersey class of application to be deployed on Jetty HTTP container.
*/
JettyHttpContainer(final Class<? extends Application> applicationClass) {
this.timeoutScheduler = createTimeoutScheduler();
this.appHandler = new ApplicationHandler(applicationClass, new JettyBinder());

cacheConfigSetStatusOverSendError();
Expand Down
Loading