Skip to content

Commit

Permalink
🐎 Improve API for #1879
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Mar 11, 2015
1 parent 52a06a0 commit 9f38497
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.atmosphere.cache.CacheMessage;
import org.atmosphere.cpr.BroadcastFilter.BroadcastAction;
import org.atmosphere.lifecycle.LifecycleHandler;
import org.atmosphere.pool.PoolableBroadcasterFactory;
import org.atmosphere.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,6 +116,7 @@ public class DefaultBroadcaster implements Broadcaster {
private Future<?> currentLifecycleTask;
private boolean cacheOnIOFlushException = true;
protected boolean sharedListeners = false;
protected boolean candidateForPoolable;

public DefaultBroadcaster() {
}
Expand Down Expand Up @@ -157,6 +159,8 @@ public Broadcaster initialize(String name, URI uri, AtmosphereConfig config) {
broadcasterListeners = new ConcurrentLinkedQueue<BroadcasterListener>();
}

candidateForPoolable = PoolableBroadcasterFactory.class.isAssignableFrom(config.getBroadcasterFactory().getClass());

return this;
}

Expand All @@ -176,37 +180,39 @@ protected BroadcasterConfig createBroadcasterConfig(AtmosphereConfig config) {

@Override
public synchronized void destroy() {
try {
logger.trace("Broadcaster {} will be pooled: {}", getID(), candidateForPoolable);
if (!candidateForPoolable) {
if (notifyOnPreDestroy()) return;

if (notifyOnPreDestroy()) return;

if (destroyed.getAndSet(true)) return;
if (destroyed.getAndSet(true)) return;

try {
logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Policy was {}", getID(), policy);
logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Resources are {}", getID(), resources);
logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Policy was {}", getID(), policy);
logger.trace("Broadcaster {} is being destroyed and cannot be re-used. Resources are {}", getID(), resources);

if (config.getBroadcasterFactory() != null) {
config.getBroadcasterFactory().remove(this, this.getID());
}
started.set(false);

started.set(false);
releaseExternalResources();
killReactiveThreads();

releaseExternalResources();
killReactiveThreads();
if (bc != null) {
bc.destroy();
}
lifeCycleListeners.clear();
delayedBroadcast.clear();
if (!sharedListeners) {
broadcasterListeners.clear();
}
}

if (bc != null) {
bc.destroy();
if (config.getBroadcasterFactory() != null) {
config.getBroadcasterFactory().remove(this, this.getID());
}

resources.clear();
broadcastOnResume.clear();
messages.clear();
delayedBroadcast.clear();
if (!sharedListeners) {
broadcasterListeners.clear();
}
writeQueues.clear();
lifeCycleListeners.clear();
} catch (Throwable t) {
logger.error("Unexpected exception during Broadcaster destroy {}", getID(), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,46 @@ public Broadcaster createBroadcaster() {
return createBroadcaster(clazz, "POOLED");
}

/**
* Set to true to enable tracking of {@link org.atmosphere.cpr.Broadcaster#getID()} duplication. Enabling this
* feature will significantly reduce the performance of the {@link org.atmosphere.pool.PoolableProvider}. Use the
* {@link org.atmosphere.cpr.DefaultBroadcasterFactory} if you need to track's duplication.
*
* @param trackPooledBroadcaster
* @return
*/
public PoolableBroadcasterFactory trackPooledBroadcaster(boolean trackPooledBroadcaster) {
this.trackPooledBroadcaster = trackPooledBroadcaster;
return this;
}

/**
* Return true is {@link Broadcaster} instance are tracked, e.g stored in a Collection for duplicate id.
*
* @return {@link Broadcaster} instance are tracked, e.g stored in a Collection for duplicate id.
*/
public boolean trackPooledBroadcaster() {
return trackPooledBroadcaster;
}

/**
* The current {@link org.atmosphere.pool.PoolableProvider}
*
* @return current {@link org.atmosphere.pool.PoolableProvider}
*/
public PoolableProvider poolableProvider() {
return poolableProvider;
}

/**
* Set the implementation of {@link org.atmosphere.pool.PoolableProvider}
*
* @param poolableProvider the implementation of {@link org.atmosphere.pool.PoolableProvider}
* @return this
*/
public PoolableBroadcasterFactory poolableProvider(PoolableProvider poolableProvider) {
this.poolableProvider = poolableProvider;
this.poolableProvider.configure(config);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*
* @author Jeanfrancois Arcand
*/
public interface PoolableProvider<T extends Broadcaster> extends AtmosphereConfigAware {
public interface PoolableProvider<T extends Broadcaster, U> extends AtmosphereConfigAware {

/**
* Return a {@link org.atmosphere.cpr.Broadcaster}
Expand All @@ -40,4 +40,21 @@ public interface PoolableProvider<T extends Broadcaster> extends AtmosphereConfi
*/
PoolableProvider returnBroadcaster(T b);

/**
* The current Pool Size
* @return current Pool size
*/
long poolSize();

/**
* Current number of active Broadcaster borrowed from the pool
*/
long activeBroadcaster();

/**
* Return the current native pool implementation. For example, the GenericObjectPool from Apache Common
* will be returned if the {@link org.atmosphere.pool.UnboundedApachePoolableProvider} is used.
* @return the current native pool implementation
*/
U implementation();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* An Unbounded Broadcaster Pool Provider of {@link Broadcaster}
*
* @author Jean-Francois Arcand
*/
public class UnboundedApachePoolableProvider implements PoolableProvider<Broadcaster> {
public class UnboundedApachePoolableProvider implements PoolableProvider<Broadcaster, GenericObjectPool> {
private final Logger logger = LoggerFactory.getLogger(UnboundedApachePoolableProvider.class);

protected GenericObjectPool<Broadcaster> genericObjectPool;
protected AtmosphereConfig config;
protected final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
protected final AbandonedConfig abandonedConfig = new AbandonedConfig();
private final AtomicInteger count = new AtomicInteger();
private final AtomicLong count = new AtomicLong();

@Override
public void configure(AtmosphereConfig config) {
Expand All @@ -66,7 +66,7 @@ public Broadcaster borrowBroadcaster(Object id) {

@Override
public PoolableProvider returnBroadcaster(Broadcaster b) {
logger.trace("Return Object {} now at size {}", b, count.getAndDecrement());
logger.trace("Return {} now at size {}", b.getID(), genericObjectPool.getNumActive());
try {
genericObjectPool.returnObject(b);
} catch (IllegalStateException ex) {
Expand All @@ -75,11 +75,26 @@ public PoolableProvider returnBroadcaster(Broadcaster b) {
return this;
}

@Override
public long poolSize() {
return genericObjectPool.getCreatedCount();
}

@Override
public long activeBroadcaster() {
return genericObjectPool.getNumActive();
}

@Override
public GenericObjectPool implementation() {
return genericObjectPool;
}

private final class BroadcasterFactory extends BasePooledObjectFactory<Broadcaster> {

@Override
public Broadcaster create() {
logger.trace("Creating Object {}", count.getAndIncrement());
logger.trace("Creating Broadcaster {}", count.getAndIncrement());
return PoolableBroadcasterFactory.class.cast(config.getBroadcasterFactory()).createBroadcaster();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package org.atmosphere.cpr;

import org.atmosphere.pool.BoundedApachePoolableProvider;
import org.atmosphere.pool.PoolableBroadcasterFactory;
import org.atmosphere.pool.UnboundedApachePoolableProvider;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -28,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

/**
* Unit tests for the {@link PoolableBroadcasterFactory}.
Expand All @@ -44,6 +47,7 @@ public void setUp() throws Exception {
AtmosphereFramework f = new AtmosphereFramework();
config = f.getAtmosphereConfig();
factory = new PoolableBroadcasterFactory(DefaultBroadcaster.class, "NEVER", config);
factory.poolableProvider(new BoundedApachePoolableProvider());
f.setBroadcasterFactory(factory);
}

Expand Down Expand Up @@ -101,7 +105,7 @@ public void onPreDestroy(Broadcaster b) {
}
});

final ConcurrentLinkedQueue c = new ConcurrentLinkedQueue();
final ConcurrentLinkedQueue<Broadcaster> c = new ConcurrentLinkedQueue<Broadcaster>();
ExecutorService r = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 100; i++) {
Expand All @@ -120,15 +124,27 @@ public void run() {
try {
assertEquals(c.size(), 100);
assertEquals(created.get(), 100);

for (Broadcaster b: c) {
b.destroy();
}

assertNotNull(factory.lookup("name" + UUID.randomUUID().toString(), true).broadcast("test"));

assertEquals(factory.poolableProvider().poolSize(), 100);

} finally {
factory.destroy();
}


}

@Test
public void concurrentAccessLookupTest() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1000);
final AtomicInteger created = new AtomicInteger();
factory.poolableProvider(new UnboundedApachePoolableProvider());
factory.addBroadcasterListener(new BroadcasterListenerAdapter() {
@Override
public void onPostCreate(Broadcaster b) {
Expand All @@ -147,7 +163,7 @@ public void onPreDestroy(Broadcaster b) {
}
});

final ConcurrentLinkedQueue c = new ConcurrentLinkedQueue();
final ConcurrentLinkedQueue<Broadcaster> c = new ConcurrentLinkedQueue<Broadcaster>();
ExecutorService r = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 1000; i++) {
Expand All @@ -171,6 +187,16 @@ public void run() {
assertEquals(latch.getCount(), 0);
assertEquals(c.size(), 1000);
assertEquals(created.get(), 1000);

for (Broadcaster b: c) {
b.destroy();
}

assertNotNull(factory.lookup("name" + UUID.randomUUID().toString(), true).broadcast("test"));

assertEquals(factory.poolableProvider().poolSize(), 1000);


} finally {
factory.destroy();
}
Expand Down

0 comments on commit 9f38497

Please sign in to comment.