From 2f201057e18f16e3cb77ef657952ad3bb91b2ccf Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Tue, 2 Apr 2024 20:20:48 +0200 Subject: [PATCH] Fix thread leak/invasion Jersey/Jetty, at least in the 3.1 version line, creates one thread for each HTTP request. This behavior was introduced with #5372 and seems not present in the 2.x or 3.x versions of Jersey. From the javadoc of `java.util.Timer`: ``` Implementation note: All constructors start a timer thread. ... After the last live reference to a Timer object goes away and all outstanding tasks have completed execution, the timer's task execution thread terminates gracefully (and becomes subject to garbage collection). However, this can take arbitrarily long to occur. ``` It is fair to assume that "arbitrarily long" may also mean _never_, in case GC never runs. This change replaces the timer & thread per request with a `ScheduledExecutorService` instance per `JettyHttpContainer`. Also changed the set-timeout mechanism to use `System.nanoTime()` instead of `System.currentTimeMillis()`, because the latter is prone to wall-clock drift and can result into wrong timeout values. Fixes #5588 Signed-off-by: Robert Stupp --- .../jersey/jetty/JettyHttpContainer.java | 89 +++++++++++++++---- 1 file changed, 72 insertions(+), 17 deletions(-) diff --git a/containers/jetty-http/src/main/java17/org/glassfish/jersey/jetty/JettyHttpContainer.java b/containers/jetty-http/src/main/java17/org/glassfish/jersey/jetty/JettyHttpContainer.java index e43816b7ce..6a3805f805 100644 --- a/containers/jetty-http/src/main/java17/org/glassfish/jersey/jetty/JettyHttpContainer.java +++ b/containers/jetty-http/src/main/java17/org/glassfish/jersey/jetty/JettyHttpContainer.java @@ -23,10 +23,13 @@ import java.security.Principal; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -88,6 +91,8 @@ public final class JettyHttpContainer extends Handler.Abstract implements Contai */ private boolean configSetStatusOverSendError; + private final ScheduledThreadPoolExecutor timeoutScheduler; + /** * Referencing factory for Jetty request. */ @@ -136,7 +141,7 @@ protected void configure() { @Override public boolean handle(Request request, Response response, Callback callback) throws Exception { - final ResponseWriter responseWriter = new ResponseWriter(request, response, callback, configSetStatusOverSendError); + final ResponseWriter responseWriter = new ResponseWriter(timeoutScheduler, request, response, callback, configSetStatusOverSendError); try { LOGGER.debugLog(LocalizationMessages.CONTAINER_STARTED()); final URI baseUri = getBaseUri(request); @@ -254,32 +259,44 @@ private static final class ResponseWriter implements ContainerResponseWriter { private final Response response; private final Callback callback; private final boolean configSetStatusOverSendError; - private final Timer timer = new Timer(); - private final long asyncStartTimeMillis; + private final long asyncStartTimeNanos; + private final ScheduledExecutorService timeoutScheduler; private final ConcurrentLinkedQueue timeoutHandlerQueue = new ConcurrentLinkedQueue<>(); - private TimerTask currentTimerTask; + private ScheduledFuture currentTimerTask; - ResponseWriter(final Request request, final Response response, final Callback callback, - final boolean configSetStatusOverSendError) { + ResponseWriter(final ScheduledExecutorService timeoutScheduler, final Request request, final Response response, + final Callback callback, final boolean configSetStatusOverSendError) { + this.timeoutScheduler = timeoutScheduler; this.request = request; this.response = response; this.callback = callback; - this.asyncStartTimeMillis = System.currentTimeMillis(); + this.asyncStartTimeNanos = System.nanoTime(); this.configSetStatusOverSendError = configSetStatusOverSendError; } private synchronized void setNewTimeout(long timeOut, TimeUnit timeUnit) { - long timeOutMillis = timeUnit.toMillis(timeOut); + long timeOutNanos = timeUnit.toNanos(timeOut); if (currentTimerTask != null) { - currentTimerTask.cancel(); + // Do not interrupt, see callTimeoutHandlers() + currentTimerTask.cancel(false); } - currentTimerTask = new TimerTask() { - @Override - public void run() { - timeoutHandlerQueue.forEach(timeoutHandler -> timeoutHandler.onTimeout(ResponseWriter.this)); + // Use System.nanoTime() as the clock source here, because the returned value is not prone to wall-clock + // drift - unlike System.currentTimeMillis(). + long delayNanos = Math.max(asyncStartTimeNanos - System.nanoTime() + timeOutNanos, 0L); + currentTimerTask = timeoutScheduler.schedule(this::callTimeoutHandlers, delayNanos, TimeUnit.NANOSECONDS); + } + + private void callTimeoutHandlers() { + // Note: Although it might not happen in practice, it is in theory possible that this function is + // called multiple times concurrently. To prevent any timeout handler being called twice, we poll() + // timeout handlers from the queue, instead of iterating over the queue. + while (true) { + TimeoutHandler handler = timeoutHandlerQueue.poll(); + if (handler == null) { + break; } - }; - timer.schedule(currentTimerTask, Math.max(0, timeOutMillis + asyncStartTimeMillis - System.currentTimeMillis())); + handler.onTimeout(ResponseWriter.this); + } } @Override @@ -419,6 +436,41 @@ public void doStop() throws Exception { super.doStop(); appHandler.onShutdown(this); appHandler = null; + + timeoutScheduler.shutdown(); + boolean needInterrupt = false; + while (true) { + try { + if (timeoutScheduler.awaitTermination(1L, TimeUnit.MINUTES)) { + break; + } + } catch (InterruptedException e) { + if (!needInterrupt) { + needInterrupt = true; + timeoutScheduler.shutdownNow(); + } + } + } + if (needInterrupt) { + Thread.currentThread().interrupt(); + } + } + + private static final AtomicInteger TIMEOUT_HANDLER_ID_GEN = new AtomicInteger(); + + private static ScheduledThreadPoolExecutor createTimeoutScheduler() { + // Note: creating the thread-pool does not start the core-pool threads. + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> { + Thread t = new Thread(r, "JettyHttpContainer-Timeout-Handler #" + TIMEOUT_HANDLER_ID_GEN.incrementAndGet()); + t.setDaemon(true); + return t; + }); + // Limit the number of timeout handling threads to a quarter of the number of CPUs, at least 2. + executor.setMaximumPoolSize(Math.max(2, Runtime.getRuntime().availableProcessors() / 4)); + executor.allowCoreThreadTimeOut(true); + // Don't Keep timeout handling threads around "forever". + executor.setKeepAliveTime(100, TimeUnit.MILLISECONDS); + return executor; } /** @@ -428,6 +480,7 @@ public void doStop() throws Exception { * @param parentContext DI provider specific context with application's registered bindings. */ JettyHttpContainer(final Application application, final Object parentContext) { + this.timeoutScheduler = createTimeoutScheduler(); this.appHandler = new ApplicationHandler(application, new JettyBinder(), parentContext); } @@ -437,6 +490,7 @@ public void doStop() throws Exception { * @param application JAX-RS / Jersey application to be deployed on Jetty HTTP container. */ JettyHttpContainer(final Application application) { + this.timeoutScheduler = createTimeoutScheduler(); this.appHandler = new ApplicationHandler(application, new JettyBinder()); cacheConfigSetStatusOverSendError(); @@ -448,6 +502,7 @@ public void doStop() throws Exception { * @param applicationClass JAX-RS / Jersey class of application to be deployed on Jetty HTTP container. */ JettyHttpContainer(final Class applicationClass) { + this.timeoutScheduler = createTimeoutScheduler(); this.appHandler = new ApplicationHandler(applicationClass, new JettyBinder()); cacheConfigSetStatusOverSendError();