Skip to content

Commit

Permalink
Fix for #766
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Jan 14, 2013
1 parent c9c637e commit a39e3e7
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.util.Enumeration;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY;
import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY_DESTROY;
Expand Down Expand Up @@ -149,11 +151,7 @@ public final Broadcaster get(Class<? extends Broadcaster> c, Object id) {
throw new NullPointerException("Class is null");
}

if (store.containsKey(id)) {
throw new IllegalStateException("Broadcaster already existing " + id + ". Use BroadcasterFactory.lookup instead");
}

return lookup(c, id, true);
return lookup(c, id, true, true);
}

private Broadcaster createBroadcaster(Class<? extends Broadcaster> c, Object id) throws BroadcasterCreationException {
Expand Down Expand Up @@ -223,34 +221,43 @@ public final Broadcaster lookup(Object id, boolean createIfNull) {
* {@inheritDoc}
*/
@Override
public synchronized Broadcaster lookup(Class<? extends Broadcaster> c, Object id, boolean createIfNull) {
Broadcaster b = store.get(id);
if (b != null && !c.isAssignableFrom(b.getClass())) {
String msg = "Invalid lookup class " + c.getName() + ". Cached class is: " + b.getClass().getName();
logger.debug(msg);
throw new IllegalStateException(msg);
}
public Broadcaster lookup(Class<? extends Broadcaster> c, Object id, boolean createIfNull) {
return lookup(c, id, createIfNull, false);
}

if ((b == null && createIfNull) || (b != null && b.isDestroyed())) {
if (b != null) {
logger.debug("Removing destroyed Broadcaster {}", b.getID());
store.remove(b.getID(), b);
public Broadcaster lookup(Class<? extends Broadcaster> c, Object id, boolean createIfNull, boolean unique) {
synchronized(id) {
if (unique && store.get(id) != null) {
throw new IllegalStateException("Broadcaster already existing " + id + ". Use BroadcasterFactory.lookup instead");
}

Broadcaster nb = store.get(id);
if (nb == null) {
nb = createBroadcaster(c, id);
store.put(id, nb);
Broadcaster b = store.get(id);
if (b != null && !c.isAssignableFrom(b.getClass())) {
String msg = "Invalid lookup class " + c.getName() + ". Cached class is: " + b.getClass().getName();
logger.debug(msg);
throw new IllegalStateException(msg);
}

if (nb == null) {
logger.debug("Added Broadcaster {} . Factory size: {}", id, store.size());
}
if ((b == null && createIfNull) || (b != null && b.isDestroyed())) {
if (b != null) {
logger.debug("Removing destroyed Broadcaster {}", b.getID());
store.remove(b.getID(), b);
}

b = nb;
}
Broadcaster nb = store.get(id);
if (nb == null) {
nb = createBroadcaster(c, id);
store.put(id, nb);
}

if (nb == null) {
logger.debug("Added Broadcaster {} . Factory size: {}", id, store.size());
}

return b;
b = nb;
}
return b;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,19 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertEquals;

/**
* Unit tests for the {@link org.atmosphere.cpr.DefaultBroadcasterFactory}.
*
*
* @author Jason Burgess
*/
public class DefaultBroadcasterFactoryTest {
Expand Down Expand Up @@ -123,4 +133,117 @@ public void testLookup_Class_Object_boolean() {
assert b instanceof DefaultBroadcaster;
assert id.equals(b.getID());
}

@Test
public void concurrentLookupTest() throws InterruptedException {
String id = "id";
final DefaultBroadcasterFactory f = new DefaultBroadcasterFactory(DefaultBroadcaster.class, "NEVER", config);
final CountDownLatch latch = new CountDownLatch(100);
final AtomicInteger created = new AtomicInteger();

f.addBroadcasterListener(new BroadcasterListener() {
@Override
public void onPostCreate(Broadcaster b) {
created.incrementAndGet();
latch.countDown();
}

@Override
public void onComplete(Broadcaster b) {

}

@Override
public void onPreDestroy(Broadcaster b) {

}
});

ExecutorService r = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 100; i++) {
r.submit(new Runnable() {
@Override
public void run() {
f.lookup("name" + UUID.randomUUID().toString(), true);
}
});
}
} finally {
r.shutdown();
}
latch.await();

try {
assertEquals(f.lookupAll().size(), 100);
assertEquals(created.get(), 100);
} finally {
f.destroy();
}
}

@Test
public void concurrentAccessLookupTest() throws InterruptedException {
final DefaultBroadcasterFactory f = new DefaultBroadcasterFactory(DefaultBroadcaster.class, "NEVER", config);
final CountDownLatch latch = new CountDownLatch(1000);
final AtomicInteger created = new AtomicInteger();
f.addBroadcasterListener(new BroadcasterListener() {
@Override
public void onPostCreate(Broadcaster b) {
created.incrementAndGet();
latch.countDown();
}

@Override
public void onComplete(Broadcaster b) {

}

@Override
public void onPreDestroy(Broadcaster b) {

}
});

ExecutorService r = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 1000; i++) {
r.submit(new Runnable() {
@Override
public void run() {
try {
f.get(TestBroadcaster.class, "me");
} catch (IllegalStateException ex) {
latch.countDown();
}
}
});

}
} finally {
r.shutdown();
}
latch.await(10, TimeUnit.SECONDS);
try {
assertEquals(latch.getCount(), 0);
assertEquals(f.lookupAll().size(), 1);
assertEquals(created.get(), 1);
assertEquals(TestBroadcaster.instance.get(), 1);
} finally {
f.destroy();
}

assertEquals(TestBroadcaster.instance.get(), 1);

}

public final static class TestBroadcaster extends DefaultBroadcaster {

public static AtomicInteger instance = new AtomicInteger();

public TestBroadcaster(String name, AtmosphereConfig config) {
super(name, config);
instance.incrementAndGet();
}
}
}

0 comments on commit a39e3e7

Please sign in to comment.