diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/compression/DeflaterPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/compression/DeflaterPool.java index a14f9aa3efdb..6759f9b7cfb9 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/compression/DeflaterPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/compression/DeflaterPool.java @@ -20,6 +20,9 @@ import java.util.zip.Deflater; +import org.eclipse.jetty.util.component.Container; +import org.eclipse.jetty.util.thread.ThreadPool; + public class DeflaterPool extends CompressionPool { private final int compressionLevel; @@ -60,4 +63,20 @@ protected void reset(Deflater deflater) { deflater.reset(); } + + public static DeflaterPool ensurePool(Container container) + { + DeflaterPool pool = container.getBean(DeflaterPool.class); + if (pool != null) + return pool; + + int capacity = CompressionPool.INFINITE_CAPACITY; + ThreadPool.SizedThreadPool threadPool = container.getBean(ThreadPool.SizedThreadPool.class); + if (threadPool != null) + capacity = threadPool.getMaxThreads(); + + pool = new DeflaterPool(capacity, Deflater.DEFAULT_COMPRESSION, true); + container.addBean(pool, true); + return pool; + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/compression/InflaterPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/compression/InflaterPool.java index b79244aa22a5..85d9be0d410e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/compression/InflaterPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/compression/InflaterPool.java @@ -20,6 +20,9 @@ import java.util.zip.Inflater; +import org.eclipse.jetty.util.component.Container; +import org.eclipse.jetty.util.thread.ThreadPool; + public class InflaterPool extends CompressionPool { private final boolean nowrap; @@ -57,4 +60,20 @@ protected void reset(Inflater inflater) { inflater.reset(); } + + public static InflaterPool ensurePool(Container container) + { + InflaterPool pool = container.getBean(InflaterPool.class); + if (pool != null) + return pool; + + int capacity = CompressionPool.INFINITE_CAPACITY; + ThreadPool.SizedThreadPool threadPool = container.getBean(ThreadPool.SizedThreadPool.class); + if (threadPool != null) + capacity = threadPool.getMaxThreads(); + + pool = new InflaterPool(capacity, true); + container.addBean(pool, true); + return pool; + } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketCompressionPoolTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketCompressionPoolTest.java new file mode 100644 index 000000000000..b134b0a728d4 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketCompressionPoolTest.java @@ -0,0 +1,114 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory; +import org.eclipse.jetty.websocket.server.NativeWebSocketConfiguration; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer.ATTR_KEY; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class WebSocketCompressionPoolTest +{ + private Server server; + private WebSocketClient client; + private ServletContextHandler context1; + private ServletContextHandler context2; + + @BeforeEach + public void start() throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + server.addConnector(connector); + + context1 = new ServletContextHandler(); + context1.setContextPath("/context1"); + NativeWebSocketServletContainerInitializer.configure(context1, (context, container) -> + container.addMapping("/", EchoSocket.class)); + WebSocketUpgradeFilter.configure(context1); + + context2 = new ServletContextHandler(); + context2.setContextPath("/context2"); + NativeWebSocketServletContainerInitializer.configure(context2, (context, container) -> + container.addMapping("/", EchoSocket.class)); + WebSocketUpgradeFilter.configure(context2); + + ContextHandlerCollection contextHandlerCollection = new ContextHandlerCollection(); + contextHandlerCollection.addHandler(context1); + contextHandlerCollection.addHandler(context2); + server.setHandler(contextHandlerCollection); + + client = new WebSocketClient(); + + server.setDumpAfterStart(true); + server.start(); + client.start(); + } + + @AfterEach + public void stop() throws Exception + { + server.stop(); + client.stop(); + } + + public static WebSocketExtensionFactory getExtensionFactory(ContextHandler contextHandler) + { + NativeWebSocketConfiguration configuration = (NativeWebSocketConfiguration)contextHandler.getAttribute(ATTR_KEY); + assertNotNull(configuration); + WebSocketExtensionFactory extensionFactory = configuration.getFactory().getBean(WebSocketExtensionFactory.class); + assertNotNull(extensionFactory); + return extensionFactory; + } + + @Test + public void test() throws Exception + { + // Check the two contexts are sharing the same inflater/deflater pools. + WebSocketExtensionFactory extensionFactory1 = getExtensionFactory(context1); + WebSocketExtensionFactory extensionFactory2 = getExtensionFactory(context2); + assertThat(extensionFactory1.getInflaterPool(), is(extensionFactory2.getInflaterPool())); + assertThat(extensionFactory1.getDeflaterPool(), is(extensionFactory2.getDeflaterPool())); + + // The extension factories and the pools have been started. + assertTrue(extensionFactory1.isStarted()); + assertTrue(extensionFactory2.isStarted()); + assertTrue(extensionFactory1.getInflaterPool().isStarted()); + assertTrue(extensionFactory1.getDeflaterPool().isStarted()); + + // Pools are managed by the server. + assertTrue(server.isManaged(extensionFactory1.getInflaterPool())); + assertTrue(server.isManaged(extensionFactory1.getDeflaterPool())); + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java index 8c59ed2652cb..85b7b1f91e6e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/WebSocketExtensionFactory.java @@ -39,12 +39,18 @@ public class WebSocketExtensionFactory extends ExtensionFactory implements LifeC { private final ContainerLifeCycle containerLifeCycle; private final WebSocketContainerScope container; - private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true); - private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true); + private final InflaterPool inflaterPool; + private final DeflaterPool deflaterPool; public WebSocketExtensionFactory(WebSocketContainerScope container) { - containerLifeCycle = new ContainerLifeCycle() + this(container, null, null); + } + + public WebSocketExtensionFactory(WebSocketContainerScope container, InflaterPool inflaterPool, DeflaterPool deflaterPool) + { + this.container = container; + this.containerLifeCycle = new ContainerLifeCycle() { @Override public String toString() @@ -53,9 +59,25 @@ public String toString() } }; - this.container = container; - containerLifeCycle.addBean(inflaterPool); - containerLifeCycle.addBean(deflaterPool); + this.inflaterPool = (inflaterPool != null) ? inflaterPool : new InflaterPool(CompressionPool.INFINITE_CAPACITY, true); + this.containerLifeCycle.addBean(this.inflaterPool); + this.deflaterPool = (deflaterPool != null) ? deflaterPool : new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true); + this.containerLifeCycle.addBean(this.deflaterPool); + } + + public void unmanage(Object object) + { + containerLifeCycle.unmanage(object); + } + + public InflaterPool getInflaterPool() + { + return inflaterPool; + } + + public DeflaterPool getDeflaterPool() + { + return deflaterPool; } @Override diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 47dff798d8f4..0fb43e93450e 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -46,11 +46,14 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.DeprecationWarning; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.compression.DeflaterPool; +import org.eclipse.jetty.util.compression.InflaterPool; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -85,6 +88,8 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketContainerScope, WebSocketServletFactory { private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class); + private static final String WEBSOCKET_INFLATER_POOL_ATTRIBUTE = "jetty.websocket.inflater"; + private static final String WEBSOCKET_DEFLATER_POOL_ATTRIBUTE = "jetty.websocket.deflater"; private final ClassLoader contextClassloader; private final Map handshakes = new HashMap<>(); @@ -161,18 +166,44 @@ private WebSocketServerFactory(ServletContext context, WebSocketPolicy policy, D this.creator = this; this.contextClassloader = Thread.currentThread().getContextClassLoader(); this.eventDriverFactory = new EventDriverFactory(this); - this.extensionFactory = new WebSocketExtensionFactory(this); + + if (context == null) + { + this.extensionFactory = new WebSocketExtensionFactory(this); + } + else + { + // Look for CompressionPools in context attributes, if null try get shared CompressionPools from the server. + DeflaterPool deflaterPool = (DeflaterPool)context.getAttribute(WEBSOCKET_DEFLATER_POOL_ATTRIBUTE); + InflaterPool inflaterPool = (InflaterPool)context.getAttribute(WEBSOCKET_INFLATER_POOL_ATTRIBUTE); + ContextHandler contextHandler = ContextHandler.getContextHandler(context); + Server server = (contextHandler == null) ? null : contextHandler.getServer(); + if (server != null) + { + if (deflaterPool == null) + deflaterPool = DeflaterPool.ensurePool(server); + if (inflaterPool == null) + inflaterPool = InflaterPool.ensurePool(server); + } + this.extensionFactory = new WebSocketExtensionFactory(this, inflaterPool, deflaterPool); + + // These pools may be managed by the server but not yet started. + // In this case we don't want them to be managed by the extensionFactory as well. + if (server != null) + { + if (server.contains(inflaterPool)) + extensionFactory.unmanage(inflaterPool); + if (server.contains(deflaterPool)) + extensionFactory.unmanage(deflaterPool); + } + } this.handshakes.put(HandshakeRFC6455.VERSION, new HandshakeRFC6455()); this.sessionFactories.add(new WebSocketSessionFactory(this)); // Create supportedVersions - List versions = new ArrayList<>(); - for (int v : handshakes.keySet()) - { - versions.add(v); - } - Collections.sort(versions, Collections.reverseOrder()); // newest first + List versions = new ArrayList<>(handshakes.keySet()); + versions.sort(Collections.reverseOrder()); // newest first StringBuilder rv = new StringBuilder(); for (int v : versions) { diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketUpgradeFilter.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketUpgradeFilter.java index b19bd6188989..5cf5970baf73 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketUpgradeFilter.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketUpgradeFilter.java @@ -77,8 +77,7 @@ public static WebSocketUpgradeFilter configure(ServletContextHandler context) th if (filter == null) { // Dynamically add filter - NativeWebSocketConfiguration configuration = NativeWebSocketServletContainerInitializer.initialize(context); - filter = new WebSocketUpgradeFilter(configuration); + filter = new WebSocketUpgradeFilter(); filter.setToAttribute(context, ATTR_KEY); String name = "Jetty_WebSocketUpgradeFilter"; @@ -109,7 +108,9 @@ public static WebSocketUpgradeFilter configure(ServletContextHandler context) th @Deprecated public static WebSocketUpgradeFilter configureContext(ServletContextHandler context) throws ServletException { - return configure(context); + WebSocketUpgradeFilter upgradeFilter = configure(context); + upgradeFilter.configuration = NativeWebSocketServletContainerInitializer.initialize(context); + return upgradeFilter; } /** @@ -126,11 +127,10 @@ public static WebSocketUpgradeFilter configureContext(ServletContext context) th { throw new ServletException("Not running on Jetty, WebSocket support unavailable"); } - return configure(handler); + return configureContext(handler); } private NativeWebSocketConfiguration configuration; - private String instanceKey; private boolean localConfiguration = false; private boolean alreadySetToAttribute = false; @@ -139,11 +139,13 @@ public WebSocketUpgradeFilter() // do nothing } + @Deprecated public WebSocketUpgradeFilter(WebSocketServerFactory factory) { this(new NativeWebSocketConfiguration(factory)); } + @Deprecated public WebSocketUpgradeFilter(NativeWebSocketConfiguration configuration) { this.configuration = configuration; @@ -378,7 +380,7 @@ public void init(FilterConfig config) throws ServletException getFactory().getPolicy().setInputBufferSize(Integer.parseInt(max)); } - instanceKey = config.getInitParameter(CONTEXT_ATTRIBUTE_KEY); + String instanceKey = config.getInitParameter(CONTEXT_ATTRIBUTE_KEY); if (instanceKey == null) { // assume default @@ -427,4 +429,4 @@ public String toString() { return String.format("%s[configuration=%s]", this.getClass().getSimpleName(), configuration); } -} +} \ No newline at end of file