From b9e5c3a84d6443f9c3bfce7f725163bb87061a79 Mon Sep 17 00:00:00 2001 From: jfarcand Date: Tue, 29 Nov 2011 18:32:44 -0500 Subject: [PATCH] Fix for https://github.com/Atmosphere/atmosphere/issues/84 and https://github.com/Atmosphere/atmosphere/issues/88 --- .../cache/BroadcasterCacheBase.java | 9 ++++++++- .../org/atmosphere/cpr/ApplicationConfig.java | 4 ++++ .../org/atmosphere/cpr/AtmosphereServlet.java | 2 ++ .../org/atmosphere/cpr/BroadcasterCache.java | 3 ++- .../org/atmosphere/cpr/BroadcasterConfig.java | 6 +++++- .../atmosphere/cpr/DefaultBroadcaster.java | 19 +++++++++++++++++-- .../jersey/util/JerseyBroadcasterUtil.java | 11 ++++++----- 7 files changed, 44 insertions(+), 10 deletions(-) diff --git a/modules/cpr/src/main/java/org/atmosphere/cache/BroadcasterCacheBase.java b/modules/cpr/src/main/java/org/atmosphere/cache/BroadcasterCacheBase.java index 49e871a3d36..11bd837e88f 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cache/BroadcasterCacheBase.java +++ b/modules/cpr/src/main/java/org/atmosphere/cache/BroadcasterCacheBase.java @@ -80,7 +80,7 @@ public abstract class BroadcasterCacheBase implements BroadcasterCache queue = new CopyOnWriteArrayList(); - protected final ScheduledExecutorService reaper = Executors.newSingleThreadScheduledExecutor(); + protected ScheduledExecutorService reaper = Executors.newSingleThreadScheduledExecutor(); protected int maxCachedinMs = 1000 * 5 * 60; @@ -111,6 +111,13 @@ public void run() { }, 0, 60, TimeUnit.SECONDS); } + public void setExecutorService(ScheduledExecutorService reaper){ + if (reaper != null) { + stop(); + } + this.reaper = reaper; + } + /** * {@inheritDoc} */ diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java b/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java index 32feeed2f18..0b5fd4d46cf 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java @@ -188,4 +188,8 @@ public interface ApplicationConfig { * The Servlet's mapping value to the FILTER_CLASS */ String FILTER_NAME = "org.atmosphere.filter.name"; + /** + * Define when a broadcasted message is cached. Value can be 'beforeFilter' or 'afterFilter'. Default is afterFilter + */ + String BROADCASTER_CACHE_STRATEGY = BroadcasterCache.class.getName() + ".strategy"; } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java index 805fda436fe..ee127d18dc1 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereServlet.java @@ -41,6 +41,7 @@ import org.apache.catalina.CometEvent; import org.apache.catalina.CometProcessor; +import org.atmosphere.cache.BroadcasterCacheBase; import org.atmosphere.container.BlockingIOCometSupport; import org.atmosphere.container.JBossWebCometSupport; import org.atmosphere.container.JettyWebSocketHandler; @@ -93,6 +94,7 @@ import static org.atmosphere.cpr.ApplicationConfig.ATMOSPHERE_HANDLER; import static org.atmosphere.cpr.ApplicationConfig.ATMOSPHERE_HANDLER_MAPPING; import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CACHE; +import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CACHE_STRATEGY; import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CLASS; import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_FACTORY; import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_LIFECYCLE_POLICY; diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterCache.java b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterCache.java index a808b23bdc4..21dcb2ce096 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterCache.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterCache.java @@ -50,9 +50,10 @@ */ public interface BroadcasterCache { - public final static String BROADCASTER_CACHE_TRACKER = BroadcasterCache.class.getName(); + public enum STRATEGY { BEFORE_FILTER, AFTER_FILTER } + /** * Start */ diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java index f4045c9bab6..52ea72644e6 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterConfig.java @@ -37,6 +37,7 @@ package org.atmosphere.cpr; +import org.atmosphere.cache.BroadcasterCacheBase; import org.atmosphere.cpr.BroadcastFilter.BroadcastAction; import org.atmosphere.di.InjectorProvider; import org.slf4j.Logger; @@ -363,7 +364,7 @@ protected void destroy(boolean force) { /** * Force shutdown of all {@link ExecutorService} */ - public void forceDestroy(){ + public void forceDestroy() { destroy(true); } @@ -510,6 +511,9 @@ public ScheduledExecutorService getScheduledExecutorService() { */ public BroadcasterConfig setBroadcasterCache(BroadcasterCache broadcasterCache) { this.broadcasterCache = broadcasterCache; + if (BroadcasterCacheBase.class.isAssignableFrom(broadcasterCache.getClass())) { + BroadcasterCacheBase.class.cast(broadcasterCache).setExecutorService(getScheduledExecutorService()); + } return this; } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java index 75d21fdebe0..8592fcbe5b2 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java @@ -109,6 +109,7 @@ public class DefaultBroadcaster implements Broadcaster { private Future currentLifecycleTask; protected URI uri; protected AtmosphereServlet.AtmosphereConfig config; + protected BroadcasterCache.STRATEGY cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER; public DefaultBroadcaster(String name, URI uri, AtmosphereServlet.AtmosphereConfig config) { this.name = name; @@ -117,6 +118,14 @@ public DefaultBroadcaster(String name, URI uri, AtmosphereServlet.AtmosphereConf broadcasterCache = new DefaultBroadcasterCache(); bc = new BroadcasterConfig(AtmosphereServlet.broadcasterFilters, config); + String s = config.getInitParameter(ApplicationConfig.BROADCASTER_CACHE_STRATEGY); + if (s != null) { + if (s.equalsIgnoreCase("afterFilter")) { + cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER; + } else if (s.equalsIgnoreCase("beforeFilter")) { + cacheStrategy = BroadcasterCache.STRATEGY.BEFORE_FILTER; + } + } } public DefaultBroadcaster(String name, AtmosphereServlet.AtmosphereConfig config) { @@ -202,6 +211,7 @@ public void setScope(SCOPE scope) { BroadcasterCache cache = bc.getBroadcasterCache().getClass().newInstance(); InjectorProvider.getInjector().inject(cache); DefaultBroadcaster.class.cast(b).broadcasterCache = cache; + DefaultBroadcaster.class.cast(b).getBroadcasterConfig().setBroadcasterCache(cache); } resource.setBroadcaster(b); b.setScope(SCOPE.REQUEST); @@ -485,7 +495,9 @@ protected void push(Entry entry) { if (resources.isEmpty()) { logger.debug("Broadcaster {} doesn't have any associated resource", getID()); - trackBroadcastMessage(null, entry.message); + + trackBroadcastMessage(null, cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER ? entry.message : entry.originalMessage); + if (entry.future != null) { entry.future.done(); } @@ -559,7 +571,10 @@ protected Object perRequestFilter(AtmosphereResource r, Entry msg) { finalMsg = a.message(); } } - trackBroadcastMessage(r, finalMsg); + + if (cacheStrategy == BroadcasterCache.STRATEGY.AFTER_FILTER) { + trackBroadcastMessage(r, finalMsg); + } } else { // The resource is no longer valid. removeAtmosphereResource(r); diff --git a/modules/jersey/src/main/java/org/atmosphere/jersey/util/JerseyBroadcasterUtil.java b/modules/jersey/src/main/java/org/atmosphere/jersey/util/JerseyBroadcasterUtil.java index 177fac19bb2..96c1d847df4 100644 --- a/modules/jersey/src/main/java/org/atmosphere/jersey/util/JerseyBroadcasterUtil.java +++ b/modules/jersey/src/main/java/org/atmosphere/jersey/util/JerseyBroadcasterUtil.java @@ -3,12 +3,10 @@ import com.sun.jersey.spi.container.ContainerResponse; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AsynchronousProcessor; -import org.atmosphere.cpr.AtmosphereEventLifecycle; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResourceEvent; import org.atmosphere.cpr.AtmosphereResourceEventImpl; import org.atmosphere.cpr.AtmosphereResourceImpl; -import org.atmosphere.cpr.BroadcasterFactory; import org.atmosphere.cpr.FrameworkConfig; import org.atmosphere.jersey.AtmosphereFilter; import org.slf4j.Logger; @@ -16,9 +14,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -98,7 +94,12 @@ public final static void broadcast(final AtmosphereResource r, final Atmos } if (lostCandidate && r != null && r.getBroadcaster() != null && r.getBroadcaster().getBroadcasterConfig().getBroadcasterCache() != null) { - r.getBroadcaster().getBroadcasterConfig().getBroadcasterCache().addToCache(r, e.getMessage()); + String s = (String)request.getAttribute(ApplicationConfig.BROADCASTER_CACHE_STRATEGY); + + // Prevent caching + if (s == null || !s.equalsIgnoreCase("beforeFilter")) { + r.getBroadcaster().getBroadcasterConfig().getBroadcasterCache().addToCache(r,e.getMessage()); + } } } }