diff --git a/integration-tests/cpr/pom.xml b/integration-tests/cpr/pom.xml index 999857bc0dc..55b7c88e983 100644 --- a/integration-tests/cpr/pom.xml +++ b/integration-tests/cpr/pom.xml @@ -56,12 +56,26 @@ - com.sun.grizzly - grizzly-websockets - ${grizzly-version} - test - true - + org.glassfish.grizzly + grizzly-websockets + ${grizzly2-version} + test + true + + + org.glassfish.grizzly + grizzly-comet + ${grizzly2-version} + test + true + + + org.glassfish.grizzly + grizzly-http-servlet + ${grizzly2-version} + test + true + com.sun.grizzly grizzly-http diff --git a/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/Grizzly2CometSupportTest.java b/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/Grizzly2CometSupportTest.java new file mode 100644 index 00000000000..9704db33287 --- /dev/null +++ b/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/Grizzly2CometSupportTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.atmosphere.tests.http; + + +import org.atmosphere.container.Grizzly2CometSupport; +import org.atmosphere.cpr.AtmosphereServlet; +import org.glassfish.grizzly.comet.CometAddOn; +import org.glassfish.grizzly.http.server.HttpServer; +import org.glassfish.grizzly.http.server.NetworkListener; +import org.glassfish.grizzly.servlet.ServletRegistration; +import org.glassfish.grizzly.servlet.WebappContext; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +public class Grizzly2CometSupportTest extends BaseTest { + + protected HttpServer ws; + + @BeforeMethod(alwaysRun = true) + public void startServer() throws Exception { + + int port = TestHelper.getEnvVariable("ATMOSPHERE_HTTP_PORT", findFreePort()); + urlTarget = "http://127.0.0.1:" + port + "/invoke"; + + ws = new HttpServer(); + NetworkListener listener = new NetworkListener("listener", "127.0.0.1", port); + ws.addListener(listener); + + listener.registerAddOn(new CometAddOn()); + WebappContext webappContext = new WebappContext("Grizzly 2 Comet Test"); + atmoServlet = new AtmosphereServlet(); + ServletRegistration registration = webappContext.addServlet("AtmosphereServlet", atmoServlet); + registration.addMapping("/*"); + registration.setLoadOnStartup(0); + + configureCometSupport(); + webappContext.deploy(ws); + + ws.start(); + } + + public void configureCometSupport() { + atmoServlet.framework().setAsyncSupport(new Grizzly2CometSupport(atmoServlet.framework().getAtmosphereConfig())); + } + + @AfterMethod(alwaysRun = true) + public void unsetAtmosphereHandler() throws Exception { + atmoServlet.framework().destroy(); + ws.stop(); + } + +} \ No newline at end of file diff --git a/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/GrizzlyCometSupportTest.java b/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/GrizzlyCometSupportTest.java index e06c4242ab5..21c1b677dc3 100644 --- a/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/GrizzlyCometSupportTest.java +++ b/integration-tests/cpr/src/test/java/org/atmosphere/tests/http/GrizzlyCometSupportTest.java @@ -94,9 +94,4 @@ public void unsetAtmosphereHandler() throws Exception { ws.stop(); } - // http://java.net/jira/browse/GRIZZLY-1123 - @Test(timeOut = 60000, enabled = false) - public void testConcurrentBroadcast() { - } - } \ No newline at end of file diff --git a/modules/cpr/pom.xml b/modules/cpr/pom.xml index 06775d4233f..deaadf268d2 100644 --- a/modules/cpr/pom.xml +++ b/modules/cpr/pom.xml @@ -75,6 +75,13 @@ + + org.glassfish.grizzly + grizzly-comet + ${grizzly2-version} + provided + true + com.sun.grizzly grizzly-websockets diff --git a/modules/cpr/src/main/java/org/atmosphere/container/Grizzly2CometSupport.java b/modules/cpr/src/main/java/org/atmosphere/container/Grizzly2CometSupport.java new file mode 100644 index 00000000000..73db4a30bb3 --- /dev/null +++ b/modules/cpr/src/main/java/org/atmosphere/container/Grizzly2CometSupport.java @@ -0,0 +1,256 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.atmosphere.container; + +import org.glassfish.grizzly.comet.CometContext; +import org.glassfish.grizzly.comet.CometEngine; +import org.glassfish.grizzly.comet.CometEvent; +import org.glassfish.grizzly.comet.CometHandler; +import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AsynchronousProcessor; +import org.atmosphere.cpr.AtmosphereConfig; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResourceImpl; +import org.atmosphere.cpr.AtmosphereResponse; +import org.glassfish.grizzly.http.server.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import java.io.IOException; +import java.util.List; + +import static org.atmosphere.cpr.ApplicationConfig.MAX_INACTIVE; + +/** + * Comet Portable Runtime implementation on top of Grizzly 1.5 and up. + * + * @author Jeanfrancois Arcand + */ +public class Grizzly2CometSupport extends AsynchronousProcessor { + + private static final Logger logger = LoggerFactory.getLogger(Grizzly2CometSupport.class); + + private static final String ATMOSPHERE = "/atmosphere"; + + private String atmosphereCtx = ""; + + public Grizzly2CometSupport(AtmosphereConfig config) { + super(config); + } + + /** + * Init Grizzly's {@link org.glassfish.grizzly.comet.CometContext} that will be used to suspend and + * resume the response. + * + * @param sc the {@link javax.servlet.ServletContext} + * @throws javax.servlet.ServletException + */ + @Override + public void init(ServletConfig sc) throws ServletException { + super.init(sc); + + atmosphereCtx = sc.getServletContext().getContextPath() + ATMOSPHERE; + + CometEngine cometEngine = CometEngine.getEngine(); + CometContext context = cometEngine.register(atmosphereCtx); + context.setExpirationDelay(-1); + logger.debug("Created CometContext for atmosphere context: {}", atmosphereCtx); + } + + /** + * {@inheritDoc} + */ + public Action service(AtmosphereRequest req, AtmosphereResponse res) + throws IOException, ServletException { + + CometContext ctx = CometEngine.getEngine().getCometContext(atmosphereCtx); + Action action = suspended(req, res); + if (action.type() == Action.TYPE.SUSPEND) { + suspend(ctx, action, req, res); + } else if (action.type() == Action.TYPE.RESUME) { + resume(req, ctx); + } + return action; + } + + /** + * Suspend the response + * + * @param ctx + * @param action + * @param req + * @param res + */ + private void suspend(CometContext ctx, Action action, AtmosphereRequest req, AtmosphereResponse res) { + VoidCometHandler c = new VoidCometHandler(req, res); + ctx.setExpirationDelay(action.timeout()); + ctx.addCometHandler(c); + req.setAttribute(ATMOSPHERE, c.hashCode()); + ctx.addAttribute("Time", System.currentTimeMillis()); + + if (supportSession()) { + // Store as well in the session in case the resume operation + // happens outside the AtmosphereHandler.onStateChange scope. + req.getSession().setAttribute(ATMOSPHERE, c.hashCode()); + } + } + + /** + * Resume the underlying response, + * + * @param req an {@link org.atmosphere.cpr.AtmosphereRequest} + * @param ctx a {@link org.glassfish.grizzly.comet.CometContext} + */ + private void resume(AtmosphereRequest req, CometContext ctx) { + + if (req.getAttribute(ATMOSPHERE) == null) { + return; + } + + CometHandler handler = getCometHandler(ctx, (Integer) req.getAttribute(ATMOSPHERE)); + req.removeAttribute(ATMOSPHERE); + + if (handler == null && supportSession() && req.getSession(false) != null) { + handler = getCometHandler(ctx, (Integer) req.getSession(false).getAttribute(ATMOSPHERE)); + req.getSession().removeAttribute(ATMOSPHERE); + } + + if (handler != null && (config.getInitParameter(ApplicationConfig.RESUME_AND_KEEPALIVE) == null + || config.getInitParameter(ApplicationConfig.RESUME_AND_KEEPALIVE).equalsIgnoreCase("false"))) { + try { + ctx.resumeCometHandler(handler); + } catch (IOException e) { + logger.error("Error resuming CometHandler", e); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void action(AtmosphereResourceImpl r) { + super.action(r); + if (r.action().type() == Action.TYPE.RESUME && r.isInScope()) { + CometContext ctx = CometEngine.getEngine().getCometContext(atmosphereCtx); + resume(r.getRequest(), ctx); + } + } + + @Override + public Action cancelled(AtmosphereRequest req, AtmosphereResponse res) + throws IOException, ServletException { + + Action action = super.cancelled(req, res); + if (req.getAttribute(MAX_INACTIVE) != null && Long.class.cast(req.getAttribute(MAX_INACTIVE)) == -1) { + resume(req, CometEngine.getEngine().getCometContext(atmosphereCtx)); + } + return action; + } + + private static CometHandler getCometHandler(CometContext context, + int handlerId) { + List handlers = context.getCometHandlers(); + for (CometHandler handler : handlers) { + if (handler.hashCode() == handlerId) { + return handler; + } + } + return null; + } + + /** + * Void {@link org.glassfish.grizzly.comet.CometHandler}, which delegate the processing of the + * {@link org.atmosphere.cpr.AtmosphereRequest} to an {@link org.atmosphere.cpr.AtmosphereResourceImpl}. + */ + private class VoidCometHandler implements CometHandler { + + AtmosphereRequest req; + AtmosphereResponse res; + private Response grizzlyResponse; + CometContext cometContext; + + public VoidCometHandler(AtmosphereRequest req, AtmosphereResponse res) { + this.req = req; + this.res = res; + } + + /** + * {@inheritDoc} + */ + public void attach(Object o) { + } + + /** + * {@inheritDoc} + */ + public void onEvent(CometEvent ce) throws IOException { + } + + /** + * {@inheritDoc} + */ + public void onInitialize(CometEvent ce) throws IOException { + } + + /** + * {@inheritDoc} + */ + public void onTerminate(CometEvent ce) throws IOException { + } + + /** + * {@inheritDoc} + */ + public synchronized void onInterrupt(CometEvent ce) throws IOException { + long timeStamp = (Long) ce.getCometContext().getAttribute("Time"); + try { + if (ce.getCometContext().getExpirationDelay() > 0 + && (System.currentTimeMillis() - timeStamp) >= ce.getCometContext().getExpirationDelay()) { + timedout(req, res); + } else { + cancelled(req, res); + } + } catch (ServletException ex) { + logger.warn("onInterrupt() encountered exception", ex); + } + } + + @Override + public Response getResponse() { + return grizzlyResponse; + } + + @Override + public void setResponse(Response response) { + grizzlyResponse = response; + } + + @Override + public CometContext getCometContext() { + return cometContext; + } + + @Override + public void setCometContext(CometContext cometContext) { + this.cometContext = cometContext; + } + } +} \ No newline at end of file diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAsyncSupportResolver.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAsyncSupportResolver.java index 08f2a0c39cb..2e094c2c3b0 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAsyncSupportResolver.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAsyncSupportResolver.java @@ -56,6 +56,7 @@ import org.atmosphere.container.BlockingIOCometSupport; import org.atmosphere.container.GlassFishWebSocketSupport; import org.atmosphere.container.GlassFishv2CometSupport; +import org.atmosphere.container.Grizzly2CometSupport; import org.atmosphere.container.GrizzlyCometSupport; import org.atmosphere.container.JBossWebCometSupport; import org.atmosphere.container.Jetty7CometSupport; @@ -93,6 +94,7 @@ public class DefaultAsyncSupportResolver implements AsyncSupportResolver { public final static String JETTY_7 = "org.eclipse.jetty.servlet.ServletContextHandler"; public final static String JETTY_8 = "org.eclipse.jetty.continuation.Servlet3Continuation"; public final static String GRIZZLY = "com.sun.grizzly.http.servlet.ServletAdapter"; + public final static String GRIZZLY2 = "org.glassfish.grizzly.http.servlet.ServletHandler"; public final static String JBOSSWEB = "org.apache.catalina.connector.HttpEventImpl"; public final static String GRIZZLY_WEBSOCKET = "com.sun.grizzly.websockets.WebSocketEngine"; public final static String NETTY = "org.jboss.netty.channel.Channel"; @@ -152,6 +154,9 @@ public List> detectContainersPresent() { if (testClassExists(GRIZZLY)) add(GrizzlyCometSupport.class); + if (testClassExists(GRIZZLY2)) + add(Grizzly2CometSupport.class); + if (testClassExists(NETTY)) add(NettyCometSupport.class); } diff --git a/pom.xml b/pom.xml index 3b6398533e3..dfe76478cfb 100755 --- a/pom.xml +++ b/pom.xml @@ -471,7 +471,8 @@ 3.8.1 1.0.0 3.0-b74b - 1.9.39 + 1.9.50 + 2.2.10 2.1.1.GA 1.0.31 7.6.0.v20120127