Skip to content

Commit

Permalink
Fix for #84 and #88
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Nov 29, 2011
1 parent 9f7ea27 commit b9e5c3a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class BroadcasterCacheBase implements BroadcasterCache<HttpServl

protected final List<CachedMessage> queue = new CopyOnWriteArrayList<CachedMessage>();

protected final ScheduledExecutorService reaper = Executors.newSingleThreadScheduledExecutor();
protected ScheduledExecutorService reaper = Executors.newSingleThreadScheduledExecutor();

protected int maxCachedinMs = 1000 * 5 * 60;

Expand Down Expand Up @@ -111,6 +111,13 @@ public void run() {
}, 0, 60, TimeUnit.SECONDS);
}

public void setExecutorService(ScheduledExecutorService reaper){
if (reaper != null) {
stop();
}
this.reaper = reaper;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,8 @@ public interface ApplicationConfig {
* The Servlet's mapping value to the FILTER_CLASS
*/
String FILTER_NAME = "org.atmosphere.filter.name";
/**
* Define when a broadcasted message is cached. Value can be 'beforeFilter' or 'afterFilter'. Default is afterFilter
*/
String BROADCASTER_CACHE_STRATEGY = BroadcasterCache.class.getName() + ".strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;
import org.atmosphere.cache.BroadcasterCacheBase;
import org.atmosphere.container.BlockingIOCometSupport;
import org.atmosphere.container.JBossWebCometSupport;
import org.atmosphere.container.JettyWebSocketHandler;
Expand Down Expand Up @@ -93,6 +94,7 @@
import static org.atmosphere.cpr.ApplicationConfig.ATMOSPHERE_HANDLER;
import static org.atmosphere.cpr.ApplicationConfig.ATMOSPHERE_HANDLER_MAPPING;
import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CACHE;
import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CACHE_STRATEGY;
import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CLASS;
import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_FACTORY;
import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_LIFECYCLE_POLICY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@
*/
public interface BroadcasterCache<V, W> {


public final static String BROADCASTER_CACHE_TRACKER = BroadcasterCache.class.getName();

public enum STRATEGY { BEFORE_FILTER, AFTER_FILTER }

/**
* Start
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

package org.atmosphere.cpr;

import org.atmosphere.cache.BroadcasterCacheBase;
import org.atmosphere.cpr.BroadcastFilter.BroadcastAction;
import org.atmosphere.di.InjectorProvider;
import org.slf4j.Logger;
Expand Down Expand Up @@ -363,7 +364,7 @@ protected void destroy(boolean force) {
/**
* Force shutdown of all {@link ExecutorService}
*/
public void forceDestroy(){
public void forceDestroy() {
destroy(true);
}

Expand Down Expand Up @@ -510,6 +511,9 @@ public ScheduledExecutorService getScheduledExecutorService() {
*/
public BroadcasterConfig setBroadcasterCache(BroadcasterCache broadcasterCache) {
this.broadcasterCache = broadcasterCache;
if (BroadcasterCacheBase.class.isAssignableFrom(broadcasterCache.getClass())) {
BroadcasterCacheBase.class.cast(broadcasterCache).setExecutorService(getScheduledExecutorService());
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class DefaultBroadcaster implements Broadcaster {
private Future<?> currentLifecycleTask;
protected URI uri;
protected AtmosphereServlet.AtmosphereConfig config;
protected BroadcasterCache.STRATEGY cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER;

public DefaultBroadcaster(String name, URI uri, AtmosphereServlet.AtmosphereConfig config) {
this.name = name;
Expand All @@ -117,6 +118,14 @@ public DefaultBroadcaster(String name, URI uri, AtmosphereServlet.AtmosphereConf

broadcasterCache = new DefaultBroadcasterCache();
bc = new BroadcasterConfig(AtmosphereServlet.broadcasterFilters, config);
String s = config.getInitParameter(ApplicationConfig.BROADCASTER_CACHE_STRATEGY);
if (s != null) {
if (s.equalsIgnoreCase("afterFilter")) {
cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER;
} else if (s.equalsIgnoreCase("beforeFilter")) {
cacheStrategy = BroadcasterCache.STRATEGY.BEFORE_FILTER;
}
}
}

public DefaultBroadcaster(String name, AtmosphereServlet.AtmosphereConfig config) {
Expand Down Expand Up @@ -202,6 +211,7 @@ public void setScope(SCOPE scope) {
BroadcasterCache cache = bc.getBroadcasterCache().getClass().newInstance();
InjectorProvider.getInjector().inject(cache);
DefaultBroadcaster.class.cast(b).broadcasterCache = cache;
DefaultBroadcaster.class.cast(b).getBroadcasterConfig().setBroadcasterCache(cache);
}
resource.setBroadcaster(b);
b.setScope(SCOPE.REQUEST);
Expand Down Expand Up @@ -485,7 +495,9 @@ protected void push(Entry entry) {

if (resources.isEmpty()) {
logger.debug("Broadcaster {} doesn't have any associated resource", getID());
trackBroadcastMessage(null, entry.message);

trackBroadcastMessage(null, cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER ? entry.message : entry.originalMessage);

if (entry.future != null) {
entry.future.done();
}
Expand Down Expand Up @@ -559,7 +571,10 @@ protected Object perRequestFilter(AtmosphereResource<?, ?> r, Entry msg) {
finalMsg = a.message();
}
}
trackBroadcastMessage(r, finalMsg);

if (cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER) {
trackBroadcastMessage(r, finalMsg);
}
} else {
// The resource is no longer valid.
removeAtmosphereResource(r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,18 @@
import com.sun.jersey.spi.container.ContainerResponse;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AsynchronousProcessor;
import org.atmosphere.cpr.AtmosphereEventLifecycle;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventImpl;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.FrameworkConfig;
import org.atmosphere.jersey.AtmosphereFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -98,7 +94,12 @@ public final static void broadcast(final AtmosphereResource<?, ?> r, final Atmos
}

if (lostCandidate && r != null && r.getBroadcaster() != null && r.getBroadcaster().getBroadcasterConfig().getBroadcasterCache() != null) {
r.getBroadcaster().getBroadcasterConfig().getBroadcasterCache().addToCache(r, e.getMessage());

This comment has been minimized.

Copy link
@haed

haed Nov 30, 2011

Contributor

I didn't tried it out (just looked at the code), but what happens to lost-candidates in the beforeFilter-strategy? Would these are swallowed?

This comment has been minimized.

Copy link
@jfarcand

jfarcand Nov 30, 2011

Author Member

Agree. I will re-open the issue with a better fix

String s = (String)request.getAttribute(ApplicationConfig.BROADCASTER_CACHE_STRATEGY);

// Prevent caching
if (s == null || !s.equalsIgnoreCase("beforeFilter")) {
r.getBroadcaster().getBroadcasterConfig().getBroadcasterCache().addToCache(r,e.getMessage());
}
}
}
}
Expand Down

0 comments on commit b9e5c3a

Please sign in to comment.