Skip to content

Commit

Permalink
Fix for #766
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Jan 14, 2013
1 parent 6adc147 commit 4a5ef54
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.util.Enumeration;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY;
import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY_DESTROY;
Expand All @@ -83,9 +85,7 @@ public class DefaultBroadcasterFactory extends BroadcasterFactory {
private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcasterFactory.class);

private final ConcurrentHashMap<Object, Broadcaster> store = new ConcurrentHashMap<Object, Broadcaster>();

private final Class<? extends Broadcaster> clazz;

private BroadcasterLifeCyclePolicy policy =
new BroadcasterLifeCyclePolicy.Builder().policy(NEVER).build();

Expand Down Expand Up @@ -149,11 +149,7 @@ public final Broadcaster get(Class<? extends Broadcaster> c, Object id) {
throw new NullPointerException("Class is null");
}

if (store.containsKey(id)) {
throw new IllegalStateException("Broadcaster already existing " + id + ". Use BroadcasterFactory.lookup instead");
}

return lookup(c, id, true);
return lookup(c, id, true, true);
}

private Broadcaster createBroadcaster(Class<? extends Broadcaster> c, Object id) throws BroadcasterCreationException {
Expand Down Expand Up @@ -223,29 +219,43 @@ public final Broadcaster lookup(Object id, boolean createIfNull) {
* {@inheritDoc}
*/
@Override
public synchronized Broadcaster lookup(Class<? extends Broadcaster> c, Object id, boolean createIfNull) {
Broadcaster b = store.get(id);
if (b != null && !c.isAssignableFrom(b.getClass())) {
String msg = "Invalid lookup class " + c.getName() + ". Cached class is: " + b.getClass().getName();
logger.debug(msg);
throw new IllegalStateException(msg);
}
public Broadcaster lookup(Class<? extends Broadcaster> c, Object id, boolean createIfNull) {
return lookup(c, id, createIfNull, false);
}

if ((b == null && createIfNull) || (b != null && b.isDestroyed())) {
if (b != null) {
logger.debug("Removing destroyed Broadcaster {}", b.getID());
store.remove(b.getID(), b);
public Broadcaster lookup(Class<? extends Broadcaster> c, Object id, boolean createIfNull, boolean unique) {
synchronized(id) {
if (unique && store.get(id) != null) {
throw new IllegalStateException("Broadcaster already existing " + id + ". Use BroadcasterFactory.lookup instead");
}

b = store.get(id);
if (b == null) {
logger.debug("Added Broadcaster {} . Factory size: {}", id, store.size());
b = createBroadcaster(c, id);
store.put(id, b);
Broadcaster b = store.get(id);
if (b != null && !c.isAssignableFrom(b.getClass())) {
String msg = "Invalid lookup class " + c.getName() + ". Cached class is: " + b.getClass().getName();
logger.debug(msg);
throw new IllegalStateException(msg);
}
}

return b;
if ((b == null && createIfNull) || (b != null && b.isDestroyed())) {
if (b != null) {
logger.debug("Removing destroyed Broadcaster {}", b.getID());
store.remove(b.getID(), b);
}

Broadcaster nb = store.get(id);
if (nb == null) {
nb = createBroadcaster(c, id);
store.put(id, nb);
}

if (nb == null) {
logger.debug("Added Broadcaster {} . Factory size: {}", id, store.size());
}

b = nb;
}
return b;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
import org.testng.annotations.Test;

import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertEquals;

/**
* Unit tests for the {@link org.atmosphere.cpr.DefaultBroadcasterFactory}.
Expand Down Expand Up @@ -131,4 +139,117 @@ public void testLookup_Class_Object_boolean() {
assert b instanceof DefaultBroadcaster;
assert id.equals(b.getID());
}

@Test
public void concurrentLookupTest() throws InterruptedException {
String id = "id";
final DefaultBroadcasterFactory f = new DefaultBroadcasterFactory(DefaultBroadcaster.class, "NEVER", config);
final CountDownLatch latch = new CountDownLatch(100);
final AtomicInteger created = new AtomicInteger();

f.addBroadcasterListener(new BroadcasterListener() {
@Override
public void onPostCreate(Broadcaster b) {
created.incrementAndGet();
latch.countDown();
}

@Override
public void onComplete(Broadcaster b) {

}

@Override
public void onPreDestroy(Broadcaster b) {

}
});

ExecutorService r = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 100; i++) {
r.submit(new Runnable() {
@Override
public void run() {
f.lookup("name" + UUID.randomUUID().toString(), true);
}
});
}
} finally {
r.shutdown();
}
latch.await();

try {
assertEquals(f.lookupAll().size(), 100);
assertEquals(created.get(), 100);
} finally {
f.destroy();
}
}

@Test
public void concurrentAccessLookupTest() throws InterruptedException {
final DefaultBroadcasterFactory f = new DefaultBroadcasterFactory(DefaultBroadcaster.class, "NEVER", config);
final CountDownLatch latch = new CountDownLatch(1000);
final AtomicInteger created = new AtomicInteger();
f.addBroadcasterListener(new BroadcasterListener() {
@Override
public void onPostCreate(Broadcaster b) {
created.incrementAndGet();
latch.countDown();
}

@Override
public void onComplete(Broadcaster b) {

}

@Override
public void onPreDestroy(Broadcaster b) {

}
});

ExecutorService r = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 1000; i++) {
r.submit(new Runnable() {
@Override
public void run() {
try {
f.get(TestBroadcaster.class, "me");
} catch (IllegalStateException ex) {
latch.countDown();
}
}
});

}
} finally {
r.shutdown();
}
latch.await(10, TimeUnit.SECONDS);
try {
assertEquals(latch.getCount(), 0);
assertEquals(f.lookupAll().size(), 1);
assertEquals(created.get(), 1);
assertEquals(TestBroadcaster.instance.get(), 1);
} finally {
f.destroy();
}

assertEquals(TestBroadcaster.instance.get(), 1);

}

public final static class TestBroadcaster extends DefaultBroadcaster {

public static AtomicInteger instance = new AtomicInteger();

public TestBroadcaster(String name, AtmosphereConfig config) {
super(name, config);
instance.incrementAndGet();
}
}
}

0 comments on commit 4a5ef54

Please sign in to comment.