Skip to content

Commit

Permalink
Fix for #696
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Oct 18, 2012
1 parent 4e3cda6 commit 9d834f0
Show file tree
Hide file tree
Showing 22 changed files with 134 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.atmosphere.container.version.GrizzlyWebSocket;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocketProcessor;
import org.slf4j.Logger;
Expand All @@ -43,8 +44,7 @@ public GlassFishWebSocketHandler(AtmosphereConfig config) {
this.config = config;
contextPath = config.getServletContext().getContextPath();

webSocketProcessor = WebSocketProcessorFactory.getDefault()
.getWebSocketProcessor(config.framework());
webSocketProcessor = WebSocketProcessorFactory.getDefault().getWebSocketProcessor(config.framework());
}

public void onConnect(WebSocket w) {
Expand All @@ -61,6 +61,8 @@ public void onConnect(WebSocket w) {
try {

AtmosphereRequest r = AtmosphereRequest.wrap(dws.getRequest());
AtmosphereResponse response = AtmosphereResponse.newInstance(config, r, webSocket);
config.framework().configureRequestResponse(r, response);
try {
// GlassFish http://java.net/jira/browse/GLASSFISH-18681
if (r.getPathInfo().startsWith(r.getContextPath())) {
Expand All @@ -71,7 +73,7 @@ public void onConnect(WebSocket w) {
// Whatever exception occurs skip it
logger.trace("", e);
}
webSocketProcessor.open(webSocket, r);
webSocketProcessor.open(webSocket, r, response);
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void onConnect(WebSocket socket) {
AtmosphereRequest r = AtmosphereRequest.wrap(g2WebSocket.getRequest());
org.atmosphere.websocket.WebSocket webSocket = new Grizzly2WebSocket(g2WebSocket, config);
g2WebSocket.getRequest().setAttribute("grizzly.webSocket", webSocket);
webSocketProcessor.open(webSocket, r);
webSocketProcessor.open(webSocket, r, AtmosphereResponse.newInstance(config, r, webSocket));
} catch (Exception e) {
LOGGER.warn("failed to connect to web socket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.atmosphere.container.version.Jetty9WebSocket;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocket;
import org.atmosphere.websocket.WebSocketProcessor;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void onWebSocketConnect(WebSocketConnection webSocketConnection) {
logger.trace("WebSocket.onOpen.");
webSocket = new Jetty9WebSocket(webSocketConnection, framework.getAtmosphereConfig());
try {
webSocketProcessor.open(webSocket, request);
webSocketProcessor.open(webSocket, request, AtmosphereResponse.newInstance(framework.getAtmosphereConfig(), request, webSocket));
} catch (Exception e) {
logger.warn("Failed to connect to WebSocket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.atmosphere.container.version.Jetty8WebSocket;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocket;
import org.atmosphere.websocket.WebSocketEventListener;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void onMessage(String data) {
public void onOpen(org.eclipse.jetty.websocket.WebSocket.Connection connection) {
logger.trace("WebSocket.onOpen.");
try {
webSocketProcessor.open(webSocket, request);
webSocketProcessor.open(webSocket, request, AtmosphereResponse.newInstance(framework.getAtmosphereConfig(), request, webSocket));
} catch (Exception e) {
logger.warn("Failed to connect to WebSocket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.atmosphere.container.version.TomcatWebSocket;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocket;
import org.atmosphere.websocket.WebSocketEventListener;
Expand All @@ -33,6 +34,7 @@
import java.nio.ByteBuffer;
import java.nio.CharBuffer;

import static org.atmosphere.cpr.ApplicationConfig.RECYCLE_ATMOSPHERE_REQUEST_RESPONSE;
import static org.atmosphere.websocket.WebSocketEventListener.WebSocketEvent.TYPE.CLOSE;
import static org.atmosphere.websocket.WebSocketEventListener.WebSocketEvent.TYPE.CONNECT;
import static org.atmosphere.websocket.WebSocketEventListener.WebSocketEvent.TYPE.MESSAGE;
Expand All @@ -57,7 +59,7 @@ protected void onOpen(WsOutbound outbound) {
logger.trace("WebSocket.onOpen.");
webSocket = new TomcatWebSocket(outbound, framework.getAtmosphereConfig());
try {
webSocketProcessor.open(webSocket, request);
webSocketProcessor.open(webSocket, request, AtmosphereResponse.newInstance(framework.getAtmosphereConfig(), request, webSocket));
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
}
Expand Down
129 changes: 69 additions & 60 deletions modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereFramework.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -1260,6 +1261,73 @@ private void getFiles(File f) {
}
}

/**
* Configure some Attribute on the {@link AtmosphereRequest}
* @param req {@link AtmosphereRequest}
*/
public AtmosphereFramework configureRequestResponse(AtmosphereRequest req, AtmosphereResponse res) throws UnsupportedEncodingException {
req.setAttribute(BROADCASTER_FACTORY, BroadcasterFactory.getDefault());
req.setAttribute(PROPERTY_USE_STREAM, useStreamForFlushingComments);
req.setAttribute(BROADCASTER_CLASS, broadcasterClassName);
req.setAttribute(ATMOSPHERE_CONFIG, config);

boolean skip = true;
String s = config.getInitParameter(ALLOW_QUERYSTRING_AS_REQUEST);
if (s != null) {
skip = Boolean.valueOf(s);
}
if (!skip || req.getAttribute(WEBSOCKET_SUSPEND) == null) {
Map<String, String> headers = configureQueryStringAsRequest(req);
String body = headers.remove(ATMOSPHERE_POST_BODY);
if (body != null && body.isEmpty()) {
body = null;
}

// We need to strip Atmosphere's own query string from the request in case an
// interceptor re-inject the request because the wrong body will be passed.
StringBuilder queryStrings = new StringBuilder("");
Enumeration<String> e = req.getParameterNames();
String name;
while (e.hasMoreElements()) {
name = e.nextElement().toLowerCase().trim();
if (!name.startsWith("x-atmosphere") && !name.equalsIgnoreCase("x-cache-date") ) {
queryStrings.append(name).append("=").append(req.getParameter(name));
}
}

// Reconfigure the request. Clear the Atmosphere queryString
req.headers(headers)
.queryString(queryStrings.toString())
.method(body != null && req.getMethod().equalsIgnoreCase("GET") ? "POST" : req.getMethod());

if (body != null) {
req.body( URLDecoder.decode(body, req.getCharacterEncoding() == null ? "UTF-8" : req.getCharacterEncoding() ) );
}
}

s = req.getHeader(X_ATMOSPHERE_TRACKING_ID);
if (s == null || s.equals("0")) {
s = UUID.randomUUID().toString();
res.setHeader(X_ATMOSPHERE_TRACKING_ID, s);
} else {
// This may breaks 1.0.0 application because the WebSocket's associated AtmosphereResource will
// all have the same UUID, and retrieving the original one for WebSocket, so we don't set it at all.
// Null means it is not an HTTP request.
if (req.resource() == null) {
res.setHeader(X_ATMOSPHERE_TRACKING_ID, s);
} else if (req.getAttribute(WebSocket.WEBSOCKET_INITIATED) == null){
// WebSocket reconnect, in case an application manually set the header
// (impossible to retrieve the headers normally with WebSocket or SSE)
res.setHeader(X_ATMOSPHERE_TRACKING_ID, s);
}
}

if (req.getAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID) == null) {
req.setAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID, s);
}
return this;
}

/**
* Invoke the proprietary {@link AsyncSupport}
*
Expand All @@ -1270,68 +1338,9 @@ private void getFiles(File f) {
* @throws ServletException
*/
public Action doCometSupport(AtmosphereRequest req, AtmosphereResponse res) throws IOException, ServletException {
req.setAttribute(BROADCASTER_FACTORY, BroadcasterFactory.getDefault());
req.setAttribute(PROPERTY_USE_STREAM, useStreamForFlushingComments);
req.setAttribute(BROADCASTER_CLASS, broadcasterClassName);
req.setAttribute(ATMOSPHERE_CONFIG, config);

Action a = null;
try {
boolean skip = true;
String s = config.getInitParameter(ALLOW_QUERYSTRING_AS_REQUEST);
if (s != null) {
skip = Boolean.valueOf(s);
}
if (!skip || req.getAttribute(WEBSOCKET_SUSPEND) == null) {
Map<String, String> headers = configureQueryStringAsRequest(req);
String body = headers.remove(ATMOSPHERE_POST_BODY);
if (body != null && body.isEmpty()) {
body = null;
}

// We need to strip Atmosphere's own query string from the request in case an
// interceptor re-inject the request because the wrong body will be passed.
StringBuilder queryStrings = new StringBuilder("");
Enumeration<String> e = req.getParameterNames();
String name;
while (e.hasMoreElements()) {
name = e.nextElement().toLowerCase().trim();
if (!name.startsWith("x-atmosphere") && !name.equalsIgnoreCase("x-cache-date") ) {
queryStrings.append(name).append("=").append(req.getParameter(name));
}
}

// Reconfigure the request. Clear the Atmosphere queryString
req.headers(headers)
.queryString(queryStrings.toString())
.method(body != null && req.getMethod().equalsIgnoreCase("GET") ? "POST" : req.getMethod());

if (body != null) {
req.body( URLDecoder.decode(body, req.getCharacterEncoding() == null ? "UTF-8" : req.getCharacterEncoding() ) );
}
}

s = req.getHeader(X_ATMOSPHERE_TRACKING_ID);
if (s == null || s.equals("0")) {
s = UUID.randomUUID().toString();
res.setHeader(X_ATMOSPHERE_TRACKING_ID, s);
} else {
// This may breaks 1.0.0 application because the WebSocket's associated AtmosphereResource will
// all have the same UUID, and retrieving the original one for WebSocket, so we don't set it at all.
// Null means it is not an HTTP request.
if (req.resource() == null) {
res.setHeader(X_ATMOSPHERE_TRACKING_ID, s);
} else if (req.getAttribute(WebSocket.WEBSOCKET_INITIATED) == null){
// WebSocket reconnect, in case an application manually set the header
// (impossible to retrieve the headers normally with WebSocket or SSE)
res.setHeader(X_ATMOSPHERE_TRACKING_ID, s);
}
}

if (req.getAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID) == null) {
req.setAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID, s);
}

configureRequestResponse(req, res);
a = asyncSupport.service(req, res);
} catch (IllegalStateException ex) {
if (ex.getMessage() != null && (ex.getMessage().startsWith("Tomcat failed") || ex.getMessage().startsWith("JBoss failed"))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.atmosphere.cpr;

import org.atmosphere.websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
import static org.atmosphere.cpr.ApplicationConfig.RECYCLE_ATMOSPHERE_REQUEST_RESPONSE;

/**
* An Atmosphere's response representation. An AtmosphereResponse can be used to construct bi-directional asynchronous
Expand Down Expand Up @@ -1177,10 +1179,26 @@ public void close() throws IOException {
*
* @return
*/
public final static AtmosphereResponse create() {
public final static AtmosphereResponse newInstance() {
return new Builder().build();
}

/**
* Create a new instance to use with WebSocket.
*
* @return
*/
public final static AtmosphereResponse newInstance(AtmosphereConfig config, AtmosphereRequest request, WebSocket webSocket) {
boolean destroyable;
String s = config.getInitParameter(RECYCLE_ATMOSPHERE_REQUEST_RESPONSE);
if (s != null && Boolean.valueOf(s)) {
destroyable = true;
} else {
destroyable = false;
}
return new AtmosphereResponse(webSocket, request, destroyable);
}

/**
* Wrap an {@link HttpServletResponse}
*
Expand Down
4 changes: 2 additions & 2 deletions modules/cpr/src/main/java/org/atmosphere/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.atmosphere.util;

import org.atmosphere.cpr.AtmosphereResponse;

import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;

Expand Down Expand Up @@ -45,6 +47,4 @@ public static boolean webSocketEnabled(HttpServletRequest request) {
}
return webSocketEnabled;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,15 @@ public WebSocketProcessor registerWebSocketHandler(String path, WebSocketHandler
* {@inheritDoc}
*/
@Override
public final void open(final WebSocket webSocket, final AtmosphereRequest request) throws IOException {
public final void open(final WebSocket webSocket, final AtmosphereRequest request, final AtmosphereResponse response) throws IOException {
if (!loggedMsg.getAndSet(true)) {
logger.debug("Atmosphere detected WebSocket: {}", webSocket.getClass().getName());
}

AtmosphereResponse wsr = new AtmosphereResponse(webSocket, request, destroyable);
request.headers(configureHeader(request)).setAttribute(WebSocket.WEBSOCKET_SUSPEND, true);

AtmosphereResource r = AtmosphereResourceFactory.getDefault().create(framework.getAtmosphereConfig(),
wsr,
response,
framework.getAsyncSupport());

request.setAttribute(INJECTED_ATMOSPHERE_RESOURCE, r);
Expand All @@ -130,7 +129,7 @@ public final void open(final WebSocket webSocket, final AtmosphereRequest reques

// No WebSocketHandler defined.
if (handlers.size() == 0) {
dispatch(webSocket, request, wsr);
dispatch(webSocket, request, response);
} else {
WebSocketHandler handler = mapper.map(request, handlers);
if (handler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface WebSocketProcessor {
* @param request
* @throws IOException
*/
void open(WebSocket webSocket, final AtmosphereRequest request) throws IOException;
void open(WebSocket webSocket, AtmosphereRequest request, AtmosphereResponse response) throws IOException;

/**
* Invoked when a WebSocket message gets received from the underlying container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class AsyncIOInterceptorTest {
@Test
public void singleTest() throws ServletException, IOException {
final AtomicReference<String> s = new AtomicReference<String>();
AtmosphereResponse res = AtmosphereResponse.create().request(AtmosphereRequest.create());
AtmosphereResponse res = AtmosphereResponse.newInstance().request(AtmosphereRequest.create());
res.request().setAttribute(PROPERTY_USE_STREAM, false);
res.asyncIOWriter(new AtmosphereInterceptorWriter().interceptor(new AsyncIOInterceptor() {

Expand Down Expand Up @@ -62,7 +62,7 @@ public void redirect(AtmosphereResponse response, String location) {
@Test
public void chaining() throws ServletException, IOException {
final AtomicReference<StringBuffer> s = new AtomicReference<StringBuffer>(new StringBuffer());
AtmosphereResponse res = AtmosphereResponse.create().request(AtmosphereRequest.create());
AtmosphereResponse res = AtmosphereResponse.newInstance().request(AtmosphereRequest.create());
res.request().setAttribute(PROPERTY_USE_STREAM, false);
res.asyncIOWriter(new AtmosphereInterceptorWriter().interceptor(new AsyncIOInterceptor() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package org.atmosphere.cpr;

import org.atmosphere.container.BlockingIOCometSupport;
import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -100,7 +98,7 @@ public void destroy() {
});

AtmosphereRequest request = new AtmosphereRequest.Builder().pathInfo("/a").build();
framework.doCometSupport(request, AtmosphereResponse.create());
framework.doCometSupport(request, AtmosphereResponse.newInstance());
r.get().resume();

assertTrue(e.get().isResuming());
Expand Down
Loading

0 comments on commit 9d834f0

Please sign in to comment.