Skip to content

Commit

Permalink
Fixes #1451, Fixes #1543
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed May 20, 2014
1 parent 4b40de3 commit d25e102
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ public AtmosphereResourceFactory resourcesFactory(){
return framework.atmosphereFactory();
}

public MetaBroadcaster metaBroadcaster(){
return framework.metaBroadcaster();
}

/**
* A shutdown hook that will be called when the {@link AtmosphereFramework#destroy} method gets invoked. An
* Application can register one of more hooks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public class AtmosphereFramework {
protected final LinkedList<BroadcasterCacheListener> broadcasterCacheListeners = new LinkedList<BroadcasterCacheListener>();
protected final List<BroadcasterConfig.FilterManipulator> filterManipulators = new ArrayList<BroadcasterConfig.FilterManipulator>();
protected AtmosphereResourceFactory arFactory;
protected MetaBroadcaster metaBroadcaster;
protected final Class<? extends AtmosphereInterceptor>[] defaultInterceptors = new Class[]{
// Add CORS support
CorsInterceptor.class,
Expand Down Expand Up @@ -860,6 +861,7 @@ public Enumeration<String> getInitParameterNames() {

// Reconfigure in case an annotation changed the default.
configureBroadcasterFactory();
configureMetaBroadcaster();
patchContainer();
configureBroadcaster();
loadConfiguration(scFacade);
Expand Down Expand Up @@ -1578,6 +1580,7 @@ public AtmosphereFramework destroy() {
BroadcasterFactory.config = null;
}

metaBroadcaster.destroy();
arFactory.destroy();
WebSocketProcessorFactory.getDefault().destroy();

Expand Down Expand Up @@ -1607,6 +1610,7 @@ public AtmosphereFramework resetStates() {

broadcasterFactory = null;
arFactory = null;
metaBroadcaster = null;
annotationFound = false;
return this;
}
Expand Down Expand Up @@ -1934,7 +1938,6 @@ private void getFiles(File f) {
* @param req {@link AtmosphereRequest}
*/
public AtmosphereFramework configureRequestResponse(AtmosphereRequest req, AtmosphereResponse res) throws UnsupportedEncodingException {
req.setAttribute(BROADCASTER_FACTORY, getBroadcasterFactory());
req.setAttribute(PROPERTY_USE_STREAM, useStreamForFlushingComments);
req.setAttribute(BROADCASTER_CLASS, broadcasterClassName);
req.setAttribute(ATMOSPHERE_CONFIG, config);
Expand Down Expand Up @@ -2114,6 +2117,9 @@ public AtmosphereFramework setUseStreamForFlushingComments(boolean useStreamForF
* @return {@link BroadcasterFactory}
*/
public BroadcasterFactory getBroadcasterFactory() {
if (broadcasterFactory == null) {
configureBroadcasterFactory();
}
return broadcasterFactory;
}

Expand Down Expand Up @@ -2896,7 +2902,6 @@ public Class<? extends AtmosphereInterceptor>[] defaultInterceptors() {
return defaultInterceptors;
}


public AtmosphereResourceFactory atmosphereFactory() {
if (arFactory == null) {
configureAtmosphereResourceFactory();
Expand All @@ -2909,4 +2914,18 @@ private AtmosphereFramework configureAtmosphereResourceFactory() {
return this;
}

public MetaBroadcaster metaBroadcaster() {
if (metaBroadcaster == null) {
configureMetaBroadcaster();
}
return metaBroadcaster;
}

private AtmosphereFramework configureMetaBroadcaster() {
if (metaBroadcaster == null) {
metaBroadcaster = new MetaBroadcaster(config);
}
return this;
}

}
34 changes: 29 additions & 5 deletions modules/cpr/src/main/java/org/atmosphere/cpr/MetaBroadcaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,26 @@ public class MetaBroadcaster {
public static final String MAPPING_REGEX = "[/a-zA-Z0-9-&.*=@_;\\?]+";

private final static Logger logger = LoggerFactory.getLogger(MetaBroadcaster.class);
private final static MetaBroadcaster metaBroadcaster = new MetaBroadcaster();
private static MetaBroadcaster metaBroadcaster;
private final static ConcurrentLinkedQueue<BroadcasterListener> broadcasterListeners = new ConcurrentLinkedQueue<BroadcasterListener>();
private final static MetaBroadcasterFuture E = new MetaBroadcasterFuture(Collections.<Broadcaster>emptyList());
private MetaBroadcasterCache cache = new NoCache();
private AtmosphereConfig config;

public MetaBroadcaster() {
// Ugly
metaBroadcaster = this;
}

public MetaBroadcaster(AtmosphereConfig config) {
this.config = config;
// Ugly
metaBroadcaster = this;
}

protected MetaBroadcasterFuture broadcast(final String path, Object message, int time, TimeUnit unit, boolean delay, boolean cacheMessage) {
if (BroadcasterFactory.getDefault() != null) {
Collection<Broadcaster> c = BroadcasterFactory.getDefault().lookupAll();
if (config != null || BroadcasterFactory.getDefault() != null) {
Collection<Broadcaster> c = config != null ? config.getBroadcasterFactory().lookupAll() : BroadcasterFactory.getDefault().lookupAll();

final Map<String, String> m = new HashMap<String, String>();
List<Broadcaster> l = new ArrayList<Broadcaster>();
Expand Down Expand Up @@ -155,7 +167,7 @@ public Future<List<Broadcaster>> broadcastTo(String broadcasterID, Object messag
* Flush the cached messages.
* @return this
*/
private MetaBroadcaster flushCache() {
protected MetaBroadcaster flushCache() {
if (cache != null) cache.flushCache();
return this;
}
Expand Down Expand Up @@ -188,7 +200,14 @@ public Future<List<Broadcaster>> delayTo(String broadcasterID, Object message, i
return map(broadcasterID, message, time, unit, true, true);
}

public final static MetaBroadcaster getDefault() {
/**
*
* @deprecated Use {@link AtmosphereConfig#metaBroadcaster()}
*/
public synchronized final static MetaBroadcaster getDefault() {
if (metaBroadcaster == null) {
metaBroadcaster = new MetaBroadcaster();
}
return metaBroadcaster;
}

Expand Down Expand Up @@ -313,6 +332,11 @@ public MetaBroadcaster cache(MetaBroadcasterCache cache) {
return this;
}

protected void destroy(){
broadcasterListeners.clear();
flushCache();
}

/**
* Cache message if no {@link Broadcaster} maps the {@link #broadcastTo(String, Object)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@

public class MetaBroadcasterTest {
private AtmosphereConfig config;
private DefaultBroadcasterFactory factory;
private BroadcasterFactory factory;
private MetaBroadcaster metaBroadcaster;

@BeforeMethod
public void setUp() throws Exception {
config = new AtmosphereFramework().getAtmosphereConfig();
factory = new DefaultBroadcasterFactory(DefaultBroadcaster.class, "NEVER", config);
config = new AtmosphereFramework().init().getAtmosphereConfig();
factory = config.getBroadcasterFactory();
factory.remove(Broadcaster.ROOT_MASTER);
metaBroadcaster = config.metaBroadcaster();
}

@AfterMethod
Expand All @@ -44,13 +47,13 @@ public void wildcardBroadcastTest() throws ExecutionException, InterruptedExcept
factory.get("/b");
factory.get("/c");

assertEquals(MetaBroadcaster.getDefault().broadcastTo("/*", "yo").get().size(), 3);
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a/b", "yo").get().size(), 0);
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a", "yo").get().size(), 1);
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/", "yo").get().size(), 3);
assertEquals(metaBroadcaster.broadcastTo("/*", "yo").get().size(), 3);
assertEquals(metaBroadcaster.broadcastTo("/a/b", "yo").get().size(), 0);
assertEquals(metaBroadcaster.broadcastTo("/a", "yo").get().size(), 1);
assertEquals(metaBroadcaster.broadcastTo("/", "yo").get().size(), 3);

factory.get("/*");
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/", "yo").get().size(), 4);
assertEquals(metaBroadcaster.broadcastTo("/", "yo").get().size(), 4);
}

@Test
Expand All @@ -60,7 +63,7 @@ public void exactBroadcastTest() throws ExecutionException, InterruptedException
factory.get("/a/b");
factory.get("/c");

assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a", "yo").get().get(0).getID(), "/a");
assertEquals(metaBroadcaster.broadcastTo("/a", "yo").get().get(0).getID(), "/a");
}

@Test
Expand All @@ -69,7 +72,7 @@ public void traillingBroadcastTest() throws ExecutionException, InterruptedExcep
factory.get("/a/b");
factory.get("/b");
factory.get("/c");
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a/b", "yo").get().size(), 1);
assertEquals(metaBroadcaster.broadcastTo("/a/b", "yo").get().size(), 1);

}

Expand All @@ -79,10 +82,10 @@ public void complexBroadcastTest() throws ExecutionException, InterruptedExcepti
factory.get("/b");
factory.get("/c");

assertEquals(MetaBroadcaster.getDefault().broadcastTo("/*", "yo").get().size(), 3);
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a/b/c/d", "yo").get().size(), 1);
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a", "yo").get().size(), 0);
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/b", "yo").get().size(), 1);
assertEquals(metaBroadcaster.broadcastTo("/*", "yo").get().size(), 3);
assertEquals(metaBroadcaster.broadcastTo("/a/b/c/d", "yo").get().size(), 1);
assertEquals(metaBroadcaster.broadcastTo("/a", "yo").get().size(), 0);
assertEquals(metaBroadcaster.broadcastTo("/b", "yo").get().size(), 1);

}

Expand All @@ -92,7 +95,7 @@ public void chatTest() throws ExecutionException, InterruptedException {
factory.get("/a/chat2");
factory.get("/a/chat3");

assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a/*", "yo").get().size(), 3);
assertEquals(metaBroadcaster.broadcastTo("/a/*", "yo").get().size(), 3);

}

Expand All @@ -101,7 +104,7 @@ public void underscoreMatching() throws ExecutionException, InterruptedException
factory.get("/a/_b");
factory.get("/b");
factory.get("/c");
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a/_b", "yo").get().size(), 1);
assertEquals(metaBroadcaster.broadcastTo("/a/_b", "yo").get().size(), 1);

}

Expand All @@ -110,7 +113,7 @@ public void issue836Test() throws ExecutionException, InterruptedException {
factory.get("/a/@b");
factory.get("/b");
factory.get("/c");
assertEquals(MetaBroadcaster.getDefault().broadcastTo("/a/@b", "yo").get().size(), 1);
assertEquals(metaBroadcaster.broadcastTo("/a/@b", "yo").get().size(), 1);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.util.concurrent.TimeUnit;

import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_CLASS;
import static org.atmosphere.cpr.ApplicationConfig.BROADCASTER_FACTORY;
import static org.atmosphere.cpr.ApplicationConfig.DEFAULT_CONTENT_TYPE;
import static org.atmosphere.cpr.ApplicationConfig.JERSEY_CONTAINER_RESPONSE_WRITER_CLASS;
import static org.atmosphere.cpr.ApplicationConfig.RESUME_ON_BROADCAST;
Expand Down Expand Up @@ -657,8 +656,7 @@ void suspend(boolean resumeOnBroadcast,
response.setStatus(200);
}

BroadcasterFactory broadcasterFactory = (BroadcasterFactory) servletReq
.getAttribute(BROADCASTER_FACTORY);
BroadcasterFactory broadcasterFactory = r.getAtmosphereConfig().getBroadcasterFactory();

boolean sessionSupported = (Boolean) servletReq.getAttribute(FrameworkConfig.SUPPORT_SESSION);
URI location = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.sun.jersey.spi.StringReaderProvider;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.FrameworkConfig;
Expand All @@ -32,9 +31,6 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;

import static org.atmosphere.cpr.FrameworkConfig.ATMOSPHERE_RESOURCE;
import static org.atmosphere.cpr.HeaderConfig.X_ATMOSPHERE_TRACKING_ID;

/**
* Placeholder for injection of Atmosphere object based on
* any parameter value (header, cookie, query, matrix or path)
Expand Down Expand Up @@ -70,8 +66,7 @@ public Object fromString(String topic) {
AtmosphereResource r =
(AtmosphereResource)
req.getAttribute(FrameworkConfig.ATMOSPHERE_RESOURCE);
BroadcasterFactory bp = (BroadcasterFactory)
req.getAttribute(ApplicationConfig.BROADCASTER_FACTORY);
BroadcasterFactory bp = r.getAtmosphereConfig().getBroadcasterFactory();

Class<? extends Broadcaster> c;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.sun.jersey.core.spi.component.ComponentContext;
import com.sun.jersey.core.spi.component.ComponentScope;
import com.sun.jersey.spi.inject.Injectable;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFactory;
Expand Down Expand Up @@ -84,7 +83,7 @@ public BroadcasterFactory getValue() {

class BroadcasterFactoryProxy extends BroadcasterFactory {
BroadcasterFactory _get() {
return (BroadcasterFactory) req.getAttribute(ApplicationConfig.BROADCASTER_FACTORY);
return getAtmosphereResource(AtmosphereResource.class, true).getAtmosphereConfig().getBroadcasterFactory();
}

@Override
Expand Down

0 comments on commit d25e102

Please sign in to comment.