Skip to content

Commit

Permalink
Merge pull request #7079 from eclipse/jetty-9.4.x-7078-WebSocketCompr…
Browse files Browse the repository at this point in the history
…essionPools

Issue #7078 - share inflater/deflater pools for websocket in jetty 9.4
  • Loading branch information
lachlan-roberts authored Nov 16, 2021
2 parents cd7782c + 1281655 commit 4a497cd
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.zip.Deflater;

import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.thread.ThreadPool;

public class DeflaterPool extends CompressionPool<Deflater>
{
private final int compressionLevel;
Expand Down Expand Up @@ -60,4 +63,20 @@ protected void reset(Deflater deflater)
{
deflater.reset();
}

public static DeflaterPool ensurePool(Container container)
{
DeflaterPool pool = container.getBean(DeflaterPool.class);
if (pool != null)
return pool;

int capacity = CompressionPool.INFINITE_CAPACITY;
ThreadPool.SizedThreadPool threadPool = container.getBean(ThreadPool.SizedThreadPool.class);
if (threadPool != null)
capacity = threadPool.getMaxThreads();

pool = new DeflaterPool(capacity, Deflater.DEFAULT_COMPRESSION, true);
container.addBean(pool, true);
return pool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.zip.Inflater;

import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.thread.ThreadPool;

public class InflaterPool extends CompressionPool<Inflater>
{
private final boolean nowrap;
Expand Down Expand Up @@ -57,4 +60,20 @@ protected void reset(Inflater inflater)
{
inflater.reset();
}

public static InflaterPool ensurePool(Container container)
{
InflaterPool pool = container.getBean(InflaterPool.class);
if (pool != null)
return pool;

int capacity = CompressionPool.INFINITE_CAPACITY;
ThreadPool.SizedThreadPool threadPool = container.getBean(ThreadPool.SizedThreadPool.class);
if (threadPool != null)
capacity = threadPool.getMaxThreads();

pool = new InflaterPool(capacity, true);
container.addBean(pool, true);
return pool;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.tests;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.server.NativeWebSocketConfiguration;
import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer.ATTR_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class WebSocketCompressionPoolTest
{
private Server server;
private WebSocketClient client;
private ServletContextHandler context1;
private ServletContextHandler context2;

@BeforeEach
public void start() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);

context1 = new ServletContextHandler();
context1.setContextPath("/context1");
NativeWebSocketServletContainerInitializer.configure(context1, (context, container) ->
container.addMapping("/", EchoSocket.class));
WebSocketUpgradeFilter.configure(context1);

context2 = new ServletContextHandler();
context2.setContextPath("/context2");
NativeWebSocketServletContainerInitializer.configure(context2, (context, container) ->
container.addMapping("/", EchoSocket.class));
WebSocketUpgradeFilter.configure(context2);

ContextHandlerCollection contextHandlerCollection = new ContextHandlerCollection();
contextHandlerCollection.addHandler(context1);
contextHandlerCollection.addHandler(context2);
server.setHandler(contextHandlerCollection);

client = new WebSocketClient();

server.setDumpAfterStart(true);
server.start();
client.start();
}

@AfterEach
public void stop() throws Exception
{
server.stop();
client.stop();
}

public static WebSocketExtensionFactory getExtensionFactory(ContextHandler contextHandler)
{
NativeWebSocketConfiguration configuration = (NativeWebSocketConfiguration)contextHandler.getAttribute(ATTR_KEY);
assertNotNull(configuration);
WebSocketExtensionFactory extensionFactory = configuration.getFactory().getBean(WebSocketExtensionFactory.class);
assertNotNull(extensionFactory);
return extensionFactory;
}

@Test
public void test() throws Exception
{
// Check the two contexts are sharing the same inflater/deflater pools.
WebSocketExtensionFactory extensionFactory1 = getExtensionFactory(context1);
WebSocketExtensionFactory extensionFactory2 = getExtensionFactory(context2);
assertThat(extensionFactory1.getInflaterPool(), is(extensionFactory2.getInflaterPool()));
assertThat(extensionFactory1.getDeflaterPool(), is(extensionFactory2.getDeflaterPool()));

// The extension factories and the pools have been started.
assertTrue(extensionFactory1.isStarted());
assertTrue(extensionFactory2.isStarted());
assertTrue(extensionFactory1.getInflaterPool().isStarted());
assertTrue(extensionFactory1.getDeflaterPool().isStarted());

// Pools are managed by the server.
assertTrue(server.isManaged(extensionFactory1.getInflaterPool()));
assertTrue(server.isManaged(extensionFactory1.getDeflaterPool()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ public class WebSocketExtensionFactory extends ExtensionFactory implements LifeC
{
private final ContainerLifeCycle containerLifeCycle;
private final WebSocketContainerScope container;
private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
private final InflaterPool inflaterPool;
private final DeflaterPool deflaterPool;

public WebSocketExtensionFactory(WebSocketContainerScope container)
{
containerLifeCycle = new ContainerLifeCycle()
this(container, null, null);
}

public WebSocketExtensionFactory(WebSocketContainerScope container, InflaterPool inflaterPool, DeflaterPool deflaterPool)
{
this.container = container;
this.containerLifeCycle = new ContainerLifeCycle()
{
@Override
public String toString()
Expand All @@ -53,9 +59,25 @@ public String toString()
}
};

this.container = container;
containerLifeCycle.addBean(inflaterPool);
containerLifeCycle.addBean(deflaterPool);
this.inflaterPool = (inflaterPool != null) ? inflaterPool : new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
this.containerLifeCycle.addBean(this.inflaterPool);
this.deflaterPool = (deflaterPool != null) ? deflaterPool : new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
this.containerLifeCycle.addBean(this.deflaterPool);
}

public void unmanage(Object object)
{
containerLifeCycle.unmanage(object);
}

public InflaterPool getInflaterPool()
{
return inflaterPool;
}

public DeflaterPool getDeflaterPool()
{
return deflaterPool;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.compression.DeflaterPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -85,6 +88,8 @@
public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketContainerScope, WebSocketServletFactory
{
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private static final String WEBSOCKET_INFLATER_POOL_ATTRIBUTE = "jetty.websocket.inflater";
private static final String WEBSOCKET_DEFLATER_POOL_ATTRIBUTE = "jetty.websocket.deflater";

private final ClassLoader contextClassloader;
private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
Expand Down Expand Up @@ -161,18 +166,44 @@ private WebSocketServerFactory(ServletContext context, WebSocketPolicy policy, D
this.creator = this;
this.contextClassloader = Thread.currentThread().getContextClassLoader();
this.eventDriverFactory = new EventDriverFactory(this);
this.extensionFactory = new WebSocketExtensionFactory(this);

if (context == null)
{
this.extensionFactory = new WebSocketExtensionFactory(this);
}
else
{
// Look for CompressionPools in context attributes, if null try get shared CompressionPools from the server.
DeflaterPool deflaterPool = (DeflaterPool)context.getAttribute(WEBSOCKET_DEFLATER_POOL_ATTRIBUTE);
InflaterPool inflaterPool = (InflaterPool)context.getAttribute(WEBSOCKET_INFLATER_POOL_ATTRIBUTE);
ContextHandler contextHandler = ContextHandler.getContextHandler(context);
Server server = (contextHandler == null) ? null : contextHandler.getServer();
if (server != null)
{
if (deflaterPool == null)
deflaterPool = DeflaterPool.ensurePool(server);
if (inflaterPool == null)
inflaterPool = InflaterPool.ensurePool(server);
}
this.extensionFactory = new WebSocketExtensionFactory(this, inflaterPool, deflaterPool);

// These pools may be managed by the server but not yet started.
// In this case we don't want them to be managed by the extensionFactory as well.
if (server != null)
{
if (server.contains(inflaterPool))
extensionFactory.unmanage(inflaterPool);
if (server.contains(deflaterPool))
extensionFactory.unmanage(deflaterPool);
}
}

this.handshakes.put(HandshakeRFC6455.VERSION, new HandshakeRFC6455());
this.sessionFactories.add(new WebSocketSessionFactory(this));

// Create supportedVersions
List<Integer> versions = new ArrayList<>();
for (int v : handshakes.keySet())
{
versions.add(v);
}
Collections.sort(versions, Collections.reverseOrder()); // newest first
List<Integer> versions = new ArrayList<>(handshakes.keySet());
versions.sort(Collections.reverseOrder()); // newest first
StringBuilder rv = new StringBuilder();
for (int v : versions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public static WebSocketUpgradeFilter configure(ServletContextHandler context) th
if (filter == null)
{
// Dynamically add filter
NativeWebSocketConfiguration configuration = NativeWebSocketServletContainerInitializer.initialize(context);
filter = new WebSocketUpgradeFilter(configuration);
filter = new WebSocketUpgradeFilter();
filter.setToAttribute(context, ATTR_KEY);

String name = "Jetty_WebSocketUpgradeFilter";
Expand Down Expand Up @@ -109,7 +108,9 @@ public static WebSocketUpgradeFilter configure(ServletContextHandler context) th
@Deprecated
public static WebSocketUpgradeFilter configureContext(ServletContextHandler context) throws ServletException
{
return configure(context);
WebSocketUpgradeFilter upgradeFilter = configure(context);
upgradeFilter.configuration = NativeWebSocketServletContainerInitializer.initialize(context);
return upgradeFilter;
}

/**
Expand All @@ -126,11 +127,10 @@ public static WebSocketUpgradeFilter configureContext(ServletContext context) th
{
throw new ServletException("Not running on Jetty, WebSocket support unavailable");
}
return configure(handler);
return configureContext(handler);
}

private NativeWebSocketConfiguration configuration;
private String instanceKey;
private boolean localConfiguration = false;
private boolean alreadySetToAttribute = false;

Expand All @@ -139,11 +139,13 @@ public WebSocketUpgradeFilter()
// do nothing
}

@Deprecated
public WebSocketUpgradeFilter(WebSocketServerFactory factory)
{
this(new NativeWebSocketConfiguration(factory));
}

@Deprecated
public WebSocketUpgradeFilter(NativeWebSocketConfiguration configuration)
{
this.configuration = configuration;
Expand Down Expand Up @@ -378,7 +380,7 @@ public void init(FilterConfig config) throws ServletException
getFactory().getPolicy().setInputBufferSize(Integer.parseInt(max));
}

instanceKey = config.getInitParameter(CONTEXT_ATTRIBUTE_KEY);
String instanceKey = config.getInitParameter(CONTEXT_ATTRIBUTE_KEY);
if (instanceKey == null)
{
// assume default
Expand Down Expand Up @@ -427,4 +429,4 @@ public String toString()
{
return String.format("%s[configuration=%s]", this.getClass().getSimpleName(), configuration);
}
}
}

0 comments on commit 4a497cd

Please sign in to comment.