Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #7078 - share inflater/deflater pools for websocket in jetty 9.4 #7079

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.zip.Deflater;

import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.thread.ThreadPool;

public class DeflaterPool extends CompressionPool<Deflater>
{
private final int compressionLevel;
Expand Down Expand Up @@ -60,4 +63,20 @@ protected void reset(Deflater deflater)
{
deflater.reset();
}

public static DeflaterPool ensurePool(Container container)
{
DeflaterPool pool = container.getBean(DeflaterPool.class);
if (pool != null)
return pool;

int capacity = CompressionPool.INFINITE_CAPACITY;
ThreadPool.SizedThreadPool threadPool = container.getBean(ThreadPool.SizedThreadPool.class);
if (threadPool != null)
capacity = threadPool.getMaxThreads();

pool = new DeflaterPool(capacity, Deflater.DEFAULT_COMPRESSION, true);
container.addBean(pool, true);
return pool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.zip.Inflater;

import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.thread.ThreadPool;

public class InflaterPool extends CompressionPool<Inflater>
{
private final boolean nowrap;
Expand Down Expand Up @@ -57,4 +60,20 @@ protected void reset(Inflater inflater)
{
inflater.reset();
}

public static InflaterPool ensurePool(Container container)
{
InflaterPool pool = container.getBean(InflaterPool.class);
if (pool != null)
return pool;

int capacity = CompressionPool.INFINITE_CAPACITY;
ThreadPool.SizedThreadPool threadPool = container.getBean(ThreadPool.SizedThreadPool.class);
if (threadPool != null)
capacity = threadPool.getMaxThreads();

pool = new InflaterPool(capacity, true);
container.addBean(pool, true);
return pool;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.tests;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.server.NativeWebSocketConfiguration;
import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer.ATTR_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class WebSocketCompressionPoolTest
{
private Server server;
private WebSocketClient client;
private ServletContextHandler context1;
private ServletContextHandler context2;

@BeforeEach
public void start() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);

context1 = new ServletContextHandler();
context1.setContextPath("/context1");
NativeWebSocketServletContainerInitializer.configure(context1, (context, container) ->
container.addMapping("/", EchoSocket.class));
WebSocketUpgradeFilter.configure(context1);

context2 = new ServletContextHandler();
context2.setContextPath("/context2");
NativeWebSocketServletContainerInitializer.configure(context2, (context, container) ->
container.addMapping("/", EchoSocket.class));
WebSocketUpgradeFilter.configure(context2);

ContextHandlerCollection contextHandlerCollection = new ContextHandlerCollection();
contextHandlerCollection.addHandler(context1);
contextHandlerCollection.addHandler(context2);
server.setHandler(contextHandlerCollection);

client = new WebSocketClient();

server.setDumpAfterStart(true);
server.start();
client.start();
}

@AfterEach
public void stop() throws Exception
{
server.stop();
client.stop();
}

public static WebSocketExtensionFactory getExtensionFactory(ContextHandler contextHandler)
{
NativeWebSocketConfiguration configuration = (NativeWebSocketConfiguration)contextHandler.getAttribute(ATTR_KEY);
assertNotNull(configuration);
WebSocketExtensionFactory extensionFactory = configuration.getFactory().getBean(WebSocketExtensionFactory.class);
assertNotNull(extensionFactory);
return extensionFactory;
}

@Test
public void test() throws Exception
{
// Check the two contexts are sharing the same inflater/deflater pools.
WebSocketExtensionFactory extensionFactory1 = getExtensionFactory(context1);
WebSocketExtensionFactory extensionFactory2 = getExtensionFactory(context2);
assertThat(extensionFactory1.getInflaterPool(), is(extensionFactory2.getInflaterPool()));
assertThat(extensionFactory1.getDeflaterPool(), is(extensionFactory2.getDeflaterPool()));

// The extension factories and the pools have been started.
assertTrue(extensionFactory1.isStarted());
assertTrue(extensionFactory2.isStarted());
assertTrue(extensionFactory1.getInflaterPool().isStarted());
assertTrue(extensionFactory1.getDeflaterPool().isStarted());

// Pools are managed by the server.
assertTrue(server.isManaged(extensionFactory1.getInflaterPool()));
assertTrue(server.isManaged(extensionFactory1.getDeflaterPool()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ public class WebSocketExtensionFactory extends ExtensionFactory implements LifeC
{
private final ContainerLifeCycle containerLifeCycle;
private final WebSocketContainerScope container;
private final InflaterPool inflaterPool = new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
private final DeflaterPool deflaterPool = new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
private final InflaterPool inflaterPool;
private final DeflaterPool deflaterPool;

public WebSocketExtensionFactory(WebSocketContainerScope container)
{
containerLifeCycle = new ContainerLifeCycle()
this(container, null, null);
}

public WebSocketExtensionFactory(WebSocketContainerScope container, InflaterPool inflaterPool, DeflaterPool deflaterPool)
{
this.container = container;
this.containerLifeCycle = new ContainerLifeCycle()
{
@Override
public String toString()
Expand All @@ -53,9 +59,25 @@ public String toString()
}
};

this.container = container;
containerLifeCycle.addBean(inflaterPool);
containerLifeCycle.addBean(deflaterPool);
this.inflaterPool = (inflaterPool != null) ? inflaterPool : new InflaterPool(CompressionPool.INFINITE_CAPACITY, true);
this.containerLifeCycle.addBean(this.inflaterPool);
this.deflaterPool = (deflaterPool != null) ? deflaterPool : new DeflaterPool(CompressionPool.INFINITE_CAPACITY, Deflater.DEFAULT_COMPRESSION, true);
this.containerLifeCycle.addBean(this.deflaterPool);
}

public void unmanage(Object object)
{
containerLifeCycle.unmanage(object);
}

public InflaterPool getInflaterPool()
{
return inflaterPool;
}

public DeflaterPool getDeflaterPool()
{
return deflaterPool;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.compression.DeflaterPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -85,6 +88,8 @@
public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketContainerScope, WebSocketServletFactory
{
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private static final String WEBSOCKET_INFLATER_POOL_ATTRIBUTE = "jetty.websocket.inflater";
private static final String WEBSOCKET_DEFLATER_POOL_ATTRIBUTE = "jetty.websocket.deflater";

private final ClassLoader contextClassloader;
private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
Expand Down Expand Up @@ -161,18 +166,44 @@ private WebSocketServerFactory(ServletContext context, WebSocketPolicy policy, D
this.creator = this;
this.contextClassloader = Thread.currentThread().getContextClassLoader();
this.eventDriverFactory = new EventDriverFactory(this);
this.extensionFactory = new WebSocketExtensionFactory(this);

if (context == null)
{
this.extensionFactory = new WebSocketExtensionFactory(this);
}
else
{
// Look for CompressionPools in context attributes, if null try get shared CompressionPools from the server.
DeflaterPool deflaterPool = (DeflaterPool)context.getAttribute(WEBSOCKET_DEFLATER_POOL_ATTRIBUTE);
InflaterPool inflaterPool = (InflaterPool)context.getAttribute(WEBSOCKET_INFLATER_POOL_ATTRIBUTE);
ContextHandler contextHandler = ContextHandler.getContextHandler(context);
Server server = (contextHandler == null) ? null : contextHandler.getServer();
if (server != null)
{
if (deflaterPool == null)
deflaterPool = DeflaterPool.ensurePool(server);
if (inflaterPool == null)
inflaterPool = InflaterPool.ensurePool(server);
}
this.extensionFactory = new WebSocketExtensionFactory(this, inflaterPool, deflaterPool);

// These pools may be managed by the server but not yet started.
// In this case we don't want them to be managed by the extensionFactory as well.
if (server != null)
{
if (server.contains(inflaterPool))
extensionFactory.unmanage(inflaterPool);
if (server.contains(deflaterPool))
extensionFactory.unmanage(deflaterPool);
}
}

this.handshakes.put(HandshakeRFC6455.VERSION, new HandshakeRFC6455());
this.sessionFactories.add(new WebSocketSessionFactory(this));

// Create supportedVersions
List<Integer> versions = new ArrayList<>();
for (int v : handshakes.keySet())
{
versions.add(v);
}
Collections.sort(versions, Collections.reverseOrder()); // newest first
List<Integer> versions = new ArrayList<>(handshakes.keySet());
versions.sort(Collections.reverseOrder()); // newest first
StringBuilder rv = new StringBuilder();
for (int v : versions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public static WebSocketUpgradeFilter configure(ServletContextHandler context) th
if (filter == null)
{
// Dynamically add filter
NativeWebSocketConfiguration configuration = NativeWebSocketServletContainerInitializer.initialize(context);
filter = new WebSocketUpgradeFilter(configuration);
filter = new WebSocketUpgradeFilter();
filter.setToAttribute(context, ATTR_KEY);

String name = "Jetty_WebSocketUpgradeFilter";
Expand Down Expand Up @@ -109,7 +108,9 @@ public static WebSocketUpgradeFilter configure(ServletContextHandler context) th
@Deprecated
public static WebSocketUpgradeFilter configureContext(ServletContextHandler context) throws ServletException
{
return configure(context);
WebSocketUpgradeFilter upgradeFilter = configure(context);
upgradeFilter.configuration = NativeWebSocketServletContainerInitializer.initialize(context);
return upgradeFilter;
}

/**
Expand All @@ -126,11 +127,10 @@ public static WebSocketUpgradeFilter configureContext(ServletContext context) th
{
throw new ServletException("Not running on Jetty, WebSocket support unavailable");
}
return configure(handler);
return configureContext(handler);
}

private NativeWebSocketConfiguration configuration;
private String instanceKey;
private boolean localConfiguration = false;
private boolean alreadySetToAttribute = false;

Expand All @@ -139,11 +139,13 @@ public WebSocketUpgradeFilter()
// do nothing
}

@Deprecated
public WebSocketUpgradeFilter(WebSocketServerFactory factory)
{
this(new NativeWebSocketConfiguration(factory));
}

@Deprecated
public WebSocketUpgradeFilter(NativeWebSocketConfiguration configuration)
{
this.configuration = configuration;
Expand Down Expand Up @@ -378,7 +380,7 @@ public void init(FilterConfig config) throws ServletException
getFactory().getPolicy().setInputBufferSize(Integer.parseInt(max));
}

instanceKey = config.getInitParameter(CONTEXT_ATTRIBUTE_KEY);
String instanceKey = config.getInitParameter(CONTEXT_ATTRIBUTE_KEY);
if (instanceKey == null)
{
// assume default
Expand Down Expand Up @@ -427,4 +429,4 @@ public String toString()
{
return String.format("%s[configuration=%s]", this.getClass().getSimpleName(), configuration);
}
}
}