Skip to content

Commit

Permalink
Fix for #508 [runtime] WebSocketProcessor must be pluggable to suppor…
Browse files Browse the repository at this point in the history
…t JSR 356
  • Loading branch information
jfarcand committed Jul 13, 2012
1 parent be96a91 commit 07ee029
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocketEventListener;
import org.atmosphere.websocket.WebSocketProcessor;
import org.atmosphere.websocket.WebSocketProtocol;
Expand Down Expand Up @@ -52,8 +53,9 @@ public void onConnect(org.eclipse.jetty.websocket.WebSocket.Outbound outbound) {

logger.debug("WebSocket.onConnect (outbound)");
try {
webSocketProcessor = new WebSocketProcessor(framework, new JettyWebSocket(outbound, framework.getAtmosphereConfig()), webSocketProtocol);
webSocketProcessor.dispatch(request);
webSocketProcessor = WebSocketProcessorFactory.getDefault()
.newWebSocketProcessor(new JettyWebSocket(outbound, framework.getAtmosphereConfig()));
webSocketProcessor.open(request);
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.lang.annotation.Target;

/**
* An annotation for telling Atmosphere which {@link org.atmosphere.websocket.WebSocketProcessor} class to use by default.
* An annotation for telling Atmosphere which {@link org.atmosphere.websocket.WebSocketProtocol} class to use by default.
*
* @author Jeanfrancois Arcand
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.util.Utils;
import org.atmosphere.websocket.WebSocketProcessor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,10 +153,10 @@ public void onConnect(com.sun.grizzly.websockets.WebSocket w) {
logger.trace("", e);
}

WebSocketProcessor webSocketProcessor = new WebSocketProcessor(config.framework(),
new GrizzlyWebSocket(webSocket, config), config.framework().getWebSocketProtocol());
WebSocketProcessor webSocketProcessor = WebSocketProcessorFactory.getDefault()
.newWebSocketProcessor(new GrizzlyWebSocket(webSocket, config));
webSocket.getRequest().setAttribute("grizzly.webSocketProcessor", webSocketProcessor);
webSocketProcessor.dispatch(r);
webSocketProcessor.open(r);
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.atmosphere.container.version.Jetty8WebSocket;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocketEventListener;
import org.atmosphere.websocket.WebSocketProcessor;
import org.atmosphere.websocket.WebSocketProtocol;
Expand Down Expand Up @@ -96,7 +97,8 @@ public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int len
public void onHandshake(org.eclipse.jetty.websocket.WebSocket.FrameConnection connection) {
logger.trace("WebSocket.onHandshake");
try {
webSocketProcessor = new WebSocketProcessor(framework, new Jetty8WebSocket(connection, framework.getAtmosphereConfig()), webSocketProtocol);
webSocketProcessor = WebSocketProcessorFactory.getDefault()
.newWebSocketProcessor(new Jetty8WebSocket(connection, framework.getAtmosphereConfig()));
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
}
Expand All @@ -115,8 +117,9 @@ public void onMessage(String data) {
public void onOpen(org.eclipse.jetty.websocket.WebSocket.Connection connection) {
logger.trace("WebSocket.onOpen.");
try {
webSocketProcessor = new WebSocketProcessor(framework, new Jetty8WebSocket(connection, framework.getAtmosphereConfig()), webSocketProtocol);
webSocketProcessor.dispatch(request);
webSocketProcessor = WebSocketProcessorFactory.getDefault()
.newWebSocketProcessor(new Jetty8WebSocket(connection, framework.getAtmosphereConfig()));
webSocketProcessor.open(request);
webSocketProcessor.notifyListener(new WebSocketEventListener.WebSocketEvent("", CONNECT, webSocketProcessor.webSocket()));
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.atmosphere.container.version.TomcatWebSocket;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocketEventListener;
import org.atmosphere.websocket.WebSocketProcessor;
import org.atmosphere.websocket.WebSocketProtocol;
Expand Down Expand Up @@ -54,8 +55,9 @@ public TomcatWebSocketHandler(AtmosphereRequest request, AtmosphereFramework fra
protected void onOpen(WsOutbound outbound) {
logger.trace("WebSocket.onOpen.");
try {
webSocketProcessor = new WebSocketProcessor(framework, new TomcatWebSocket(outbound, framework.getAtmosphereConfig()), webSocketProtocol);
webSocketProcessor.dispatch(request);
webSocketProcessor = WebSocketProcessorFactory.getDefault()
.newWebSocketProcessor(new TomcatWebSocket(outbound, framework.getAtmosphereConfig()));
webSocketProcessor.open(request);
webSocketProcessor.notifyListener(new WebSocketEventListener.WebSocketEvent("", CONNECT, webSocketProcessor.webSocket()));
} catch (Exception e) {
logger.warn("failed to connect to web socket", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.atmosphere.client.MessageLengthInterceptor;
import org.atmosphere.interceptor.AtmosphereResourceLifecycleInterceptor;
import org.atmosphere.websocket.WebSocketProcessor;
import org.atmosphere.websocket.WebSocketProtocol;

/**
Expand Down Expand Up @@ -93,6 +94,10 @@ public interface ApplicationConfig {
* The {@link org.atmosphere.cpr.BroadcasterLifeCyclePolicy} policy to use
*/
String BROADCASTER_LIFECYCLE_POLICY = ApplicationConfig.class.getPackage().getName() + ".broadcasterLifeCyclePolicy";
/**
* Tell Atmosphere the {@link org.atmosphere.websocket.WebSocketProcessor} to use.
*/
String WEBSOCKET_PROCESSOR = WebSocketProcessor.class.getName();
/**
* Tell Atmosphere the {@link org.atmosphere.websocket.WebSocketProcessor} to use.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.atmosphere.util.AtmosphereConfigReader;
import org.atmosphere.util.IntrospectionUtils;
import org.atmosphere.util.Version;
import org.atmosphere.websocket.DefaultWebSocketProcessor;
import org.atmosphere.websocket.WebSocket;
import org.atmosphere.websocket.WebSocketProtocol;
import org.atmosphere.websocket.protocol.SimpleHttpProtocol;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_SESSION_SUPPORT;
import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
import static org.atmosphere.cpr.ApplicationConfig.RESUME_AND_KEEPALIVE;
import static org.atmosphere.cpr.ApplicationConfig.WEBSOCKET_PROCESSOR;
import static org.atmosphere.cpr.ApplicationConfig.WEBSOCKET_PROTOCOL;
import static org.atmosphere.cpr.ApplicationConfig.WEBSOCKET_SUPPORT;
import static org.atmosphere.cpr.FrameworkConfig.ATMOSPHERE_CONFIG;
Expand Down Expand Up @@ -151,6 +153,7 @@ public class AtmosphereFramework implements ServletContextProvider {
protected boolean scanDone = false;
protected String annotationProcessorClassName = "org.atmosphere.cpr.DefaultAnnotationProcessor";
protected final List<BroadcasterListener> broadcasterListeners = new ArrayList<BroadcasterListener>();
protected String webSocketProcessorClassName = DefaultWebSocketProcessor.class.getName();

@Override
public ServletContext getServletContext() {
Expand Down Expand Up @@ -490,7 +493,7 @@ public Enumeration<String> getInitParameterNames() {

autoDetectContainer();
configureWebDotXmlAtmosphereHandler(sc);
initWebSocketProtocol();
initWebSocket();
asyncSupport.init(scFacade);
initAtmosphereHandler(scFacade);
configureAtmosphereInterceptor(sc);
Expand All @@ -509,6 +512,7 @@ public Enumeration<String> getInitParameterNames() {

logger.info("HttpSession supported: {}", config.isSupportSession());
logger.info("Using BroadcasterFactory: {}", BroadcasterFactory.getDefault().getClass().getName());
logger.info("Using WebSocketProcessor: {}", webSocketProcessorClassName);
logger.info("Using Broadcaster: {}", broadcasterClassName);
logger.info("Atmosphere Framework {} started.", Version.getRawVersion());
} catch (Throwable t) {
Expand Down Expand Up @@ -651,6 +655,11 @@ protected void doInitParamsForWebSocket(ServletConfig sc) {
if (s != null) {
webSocketProtocolClassName = s;
}

s = sc.getInitParameter(WEBSOCKET_PROCESSOR);
if (s != null) {
webSocketProcessorClassName = s;
}
}

/**
Expand Down Expand Up @@ -857,7 +866,7 @@ public void destroy() {
}
}

protected void initWebSocketProtocol() {
protected void initWebSocket() {
if (webSocketProtocol == null) {
try {
webSocketProtocol = (WebSocketProtocol) AtmosphereFramework.class.getClassLoader()
Expand All @@ -869,6 +878,8 @@ protected void initWebSocketProtocol() {
}
}
webSocketProtocol.configure(config);

new WebSocketProcessorFactory(config);
}

public AtmosphereFramework destroy() {
Expand Down Expand Up @@ -1456,6 +1467,15 @@ public AtmosphereFramework setHandlersPath(String handlersPath) {
return this;
}

public String getWebSocketProcessorClassName() {
return webSocketProcessorClassName;
}

public AtmosphereFramework setWebsocketProcessorClassName(String webSocketProcessorClassName){
this.webSocketProcessorClassName = webSocketProcessorClassName;
return this;
}

/**
* Add an {@link AtmosphereInterceptor} implementation. The adding order or {@link AtmosphereInterceptor} will be used, e.g
* the first added {@link AtmosphereInterceptor} will always be called first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.atmosphere.config.service.BroadcasterService;
import org.atmosphere.config.service.MeteorService;
import org.atmosphere.config.service.WebSocketHandlerService;
import org.atmosphere.config.service.WebSocketProcessorService;
import org.atmosphere.config.service.WebSocketProtocolService;
import org.atmosphere.handler.ReflectorServletProcessor;
import org.atmosphere.util.IntrospectionUtils;
Expand Down Expand Up @@ -78,7 +79,8 @@ public Class<? extends Annotation>[] annotations() {
AtmosphereInterceptorService.class,
BroadcasterListenerService.class,
AsyncSupportService.class,
AsyncSupportListenerService.class
AsyncSupportListenerService.class,
WebSocketProcessorService.class
};
}

Expand Down Expand Up @@ -200,6 +202,12 @@ public void reportTypeAnnotation(Class<? extends Annotation> annotation, String
} catch (Throwable e) {
logger.warn("", e);
}
} else if (WebSocketProcessorService.class.equals(annotation)) {
try {
framework.setWebsocketProcessorClassName(className);
} catch (Throwable e) {
logger.warn("", e);
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.atmosphere.cpr;

import org.atmosphere.websocket.DefaultWebSocketProcessor;
import org.atmosphere.websocket.WebSocket;
import org.atmosphere.websocket.WebSocketProcessor;

public class WebSocketProcessorFactory {

private static WebSocketProcessorFactory factory;
private final AtmosphereConfig config;
private final String webSocketProcessorName;

protected WebSocketProcessorFactory(AtmosphereConfig config) {
this.config = config;
factory = this;
webSocketProcessorName = config.framework().getWebSocketProcessorClassName();
}

public final static WebSocketProcessorFactory getDefault() {
return factory;
}

public WebSocketProcessor newWebSocketProcessor(WebSocket webSocket) {
WebSocketProcessor wp = null;
if (webSocketProcessorName.equalsIgnoreCase(WebSocketProcessor.class.getName())) {
try {
wp = (WebSocketProcessor) Thread.currentThread().getContextClassLoader()
.loadClass(webSocketProcessorName).newInstance();
} catch (Exception ex) {
try {
wp = (WebSocketProcessor) getClass().getClassLoader()
.loadClass(webSocketProcessorName).newInstance();
} catch (Exception ex2) {
}
}
}

if (wp == null) {
wp = new DefaultWebSocketProcessor(config.framework(), webSocket, config.framework().getWebSocketProtocol());
}

return wp;
}

}
Loading

0 comments on commit 07ee029

Please sign in to comment.