Skip to content

Commit

Permalink
Merge pull request #501 from rlubke/master
Browse files Browse the repository at this point in the history
Integrate Grizzly 2 Comet Support
  • Loading branch information
jfarcand committed Jul 12, 2012
2 parents 5ba29c9 + 6ee44a7 commit 77a255c
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 12 deletions.
26 changes: 20 additions & 6 deletions integration-tests/cpr/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,26 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-websockets</artifactId>
<version>${grizzly-version}</version>
<scope>test</scope>
<optional>true</optional>
</dependency>
<groupId>org.glassfish.grizzly</groupId>
<artifactId>grizzly-websockets</artifactId>
<version>${grizzly2-version}</version>
<scope>test</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.glassfish.grizzly</groupId>
<artifactId>grizzly-comet</artifactId>
<version>${grizzly2-version}</version>
<scope>test</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.glassfish.grizzly</groupId>
<artifactId>grizzly-http-servlet</artifactId>
<version>${grizzly2-version}</version>
<scope>test</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-http</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2012 Jeanfrancois Arcand
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.atmosphere.tests.http;


import org.atmosphere.container.Grizzly2CometSupport;
import org.atmosphere.cpr.AtmosphereServlet;
import org.glassfish.grizzly.comet.CometAddOn;
import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.grizzly.http.server.NetworkListener;
import org.glassfish.grizzly.servlet.ServletRegistration;
import org.glassfish.grizzly.servlet.WebappContext;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

public class Grizzly2CometSupportTest extends BaseTest {

protected HttpServer ws;

@BeforeMethod(alwaysRun = true)
public void startServer() throws Exception {

int port = TestHelper.getEnvVariable("ATMOSPHERE_HTTP_PORT", findFreePort());
urlTarget = "http://127.0.0.1:" + port + "/invoke";

ws = new HttpServer();
NetworkListener listener = new NetworkListener("listener", "127.0.0.1", port);
ws.addListener(listener);

listener.registerAddOn(new CometAddOn());
WebappContext webappContext = new WebappContext("Grizzly 2 Comet Test");
atmoServlet = new AtmosphereServlet();
ServletRegistration registration = webappContext.addServlet("AtmosphereServlet", atmoServlet);
registration.addMapping("/*");
registration.setLoadOnStartup(0);

configureCometSupport();
webappContext.deploy(ws);

ws.start();
}

public void configureCometSupport() {
atmoServlet.framework().setAsyncSupport(new Grizzly2CometSupport(atmoServlet.framework().getAtmosphereConfig()));
}

@AfterMethod(alwaysRun = true)
public void unsetAtmosphereHandler() throws Exception {
atmoServlet.framework().destroy();
ws.stop();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,4 @@ public void unsetAtmosphereHandler() throws Exception {
ws.stop();
}

// http://java.net/jira/browse/GRIZZLY-1123
@Test(timeOut = 60000, enabled = false)
public void testConcurrentBroadcast() {
}

}
7 changes: 7 additions & 0 deletions modules/cpr/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish.grizzly</groupId>
<artifactId>grizzly-comet</artifactId>
<version>${grizzly2-version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-websockets</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Copyright 2012 Jeanfrancois Arcand
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.atmosphere.container;

import org.glassfish.grizzly.comet.CometContext;
import org.glassfish.grizzly.comet.CometEngine;
import org.glassfish.grizzly.comet.CometEvent;
import org.glassfish.grizzly.comet.CometHandler;
import org.atmosphere.cpr.Action;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AsynchronousProcessor;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.AtmosphereResponse;
import org.glassfish.grizzly.http.server.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import java.io.IOException;
import java.util.List;

import static org.atmosphere.cpr.ApplicationConfig.MAX_INACTIVE;

/**
* Comet Portable Runtime implementation on top of Grizzly 1.5 and up.
*
* @author Jeanfrancois Arcand
*/
public class Grizzly2CometSupport extends AsynchronousProcessor {

private static final Logger logger = LoggerFactory.getLogger(Grizzly2CometSupport.class);

private static final String ATMOSPHERE = "/atmosphere";

private String atmosphereCtx = "";

public Grizzly2CometSupport(AtmosphereConfig config) {
super(config);
}

/**
* Init Grizzly's {@link org.glassfish.grizzly.comet.CometContext} that will be used to suspend and
* resume the response.
*
* @param sc the {@link javax.servlet.ServletContext}
* @throws javax.servlet.ServletException
*/
@Override
public void init(ServletConfig sc) throws ServletException {
super.init(sc);

atmosphereCtx = sc.getServletContext().getContextPath() + ATMOSPHERE;

CometEngine cometEngine = CometEngine.getEngine();
CometContext context = cometEngine.register(atmosphereCtx);
context.setExpirationDelay(-1);
logger.debug("Created CometContext for atmosphere context: {}", atmosphereCtx);
}

/**
* {@inheritDoc}
*/
public Action service(AtmosphereRequest req, AtmosphereResponse res)
throws IOException, ServletException {

CometContext ctx = CometEngine.getEngine().getCometContext(atmosphereCtx);
Action action = suspended(req, res);
if (action.type() == Action.TYPE.SUSPEND) {
suspend(ctx, action, req, res);
} else if (action.type() == Action.TYPE.RESUME) {
resume(req, ctx);
}
return action;
}

/**
* Suspend the response
*
* @param ctx
* @param action
* @param req
* @param res
*/
private void suspend(CometContext ctx, Action action, AtmosphereRequest req, AtmosphereResponse res) {
VoidCometHandler c = new VoidCometHandler(req, res);
ctx.setExpirationDelay(action.timeout());
ctx.addCometHandler(c);
req.setAttribute(ATMOSPHERE, c.hashCode());
ctx.addAttribute("Time", System.currentTimeMillis());

if (supportSession()) {
// Store as well in the session in case the resume operation
// happens outside the AtmosphereHandler.onStateChange scope.
req.getSession().setAttribute(ATMOSPHERE, c.hashCode());
}
}

/**
* Resume the underlying response,
*
* @param req an {@link org.atmosphere.cpr.AtmosphereRequest}
* @param ctx a {@link org.glassfish.grizzly.comet.CometContext}
*/
private void resume(AtmosphereRequest req, CometContext ctx) {

if (req.getAttribute(ATMOSPHERE) == null) {
return;
}

CometHandler handler = getCometHandler(ctx, (Integer) req.getAttribute(ATMOSPHERE));
req.removeAttribute(ATMOSPHERE);

if (handler == null && supportSession() && req.getSession(false) != null) {
handler = getCometHandler(ctx, (Integer) req.getSession(false).getAttribute(ATMOSPHERE));
req.getSession().removeAttribute(ATMOSPHERE);
}

if (handler != null && (config.getInitParameter(ApplicationConfig.RESUME_AND_KEEPALIVE) == null
|| config.getInitParameter(ApplicationConfig.RESUME_AND_KEEPALIVE).equalsIgnoreCase("false"))) {
try {
ctx.resumeCometHandler(handler);
} catch (IOException e) {
logger.error("Error resuming CometHandler", e);
}
}
}

/**
* {@inheritDoc}
*/
@Override
public void action(AtmosphereResourceImpl r) {
super.action(r);
if (r.action().type() == Action.TYPE.RESUME && r.isInScope()) {
CometContext ctx = CometEngine.getEngine().getCometContext(atmosphereCtx);
resume(r.getRequest(), ctx);
}
}

@Override
public Action cancelled(AtmosphereRequest req, AtmosphereResponse res)
throws IOException, ServletException {

Action action = super.cancelled(req, res);
if (req.getAttribute(MAX_INACTIVE) != null && Long.class.cast(req.getAttribute(MAX_INACTIVE)) == -1) {
resume(req, CometEngine.getEngine().getCometContext(atmosphereCtx));
}
return action;
}

private static CometHandler getCometHandler(CometContext context,
int handlerId) {
List<CometHandler> handlers = context.getCometHandlers();
for (CometHandler handler : handlers) {
if (handler.hashCode() == handlerId) {
return handler;
}
}
return null;
}

/**
* Void {@link org.glassfish.grizzly.comet.CometHandler}, which delegate the processing of the
* {@link org.atmosphere.cpr.AtmosphereRequest} to an {@link org.atmosphere.cpr.AtmosphereResourceImpl}.
*/
private class VoidCometHandler implements CometHandler {

AtmosphereRequest req;
AtmosphereResponse res;
private Response grizzlyResponse;
CometContext cometContext;

public VoidCometHandler(AtmosphereRequest req, AtmosphereResponse res) {
this.req = req;
this.res = res;
}

/**
* {@inheritDoc}
*/
public void attach(Object o) {
}

/**
* {@inheritDoc}
*/
public void onEvent(CometEvent ce) throws IOException {
}

/**
* {@inheritDoc}
*/
public void onInitialize(CometEvent ce) throws IOException {
}

/**
* {@inheritDoc}
*/
public void onTerminate(CometEvent ce) throws IOException {
}

/**
* {@inheritDoc}
*/
public synchronized void onInterrupt(CometEvent ce) throws IOException {
long timeStamp = (Long) ce.getCometContext().getAttribute("Time");
try {
if (ce.getCometContext().getExpirationDelay() > 0
&& (System.currentTimeMillis() - timeStamp) >= ce.getCometContext().getExpirationDelay()) {
timedout(req, res);
} else {
cancelled(req, res);
}
} catch (ServletException ex) {
logger.warn("onInterrupt() encountered exception", ex);
}
}

@Override
public Response getResponse() {
return grizzlyResponse;
}

@Override
public void setResponse(Response response) {
grizzlyResponse = response;
}

@Override
public CometContext getCometContext() {
return cometContext;
}

@Override
public void setCometContext(CometContext cometContext) {
this.cometContext = cometContext;
}
}
}
Loading

0 comments on commit 77a255c

Please sign in to comment.