-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #505 from rachmatowicz/EJBCLIENT-402
[EJBCLIENT-402] Convert some remaining test cases to use AbstractEJBClientTestCase
- Loading branch information
Showing
9 changed files
with
227 additions
and
187 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,9 +21,11 @@ | |
import org.jboss.ejb.client.EJBClient; | ||
import org.jboss.ejb.client.StatelessEJBLocator; | ||
import org.jboss.ejb.client.legacy.JBossEJBProperties; | ||
import org.jboss.ejb.client.test.common.DummyServer; | ||
import org.jboss.ejb.client.test.common.Echo; | ||
import org.jboss.ejb.client.test.common.Result; | ||
import org.jboss.ejb.client.test.common.StatelessEchoBean; | ||
import org.jboss.ejb.server.ClusterTopologyListener; | ||
import org.jboss.logging.Logger; | ||
import org.junit.After; | ||
import org.junit.AfterClass; | ||
|
@@ -32,16 +34,28 @@ | |
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
|
||
import javax.ejb.EJBException; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import static org.junit.Assert.fail; | ||
|
||
/** | ||
* Tests basic invocation of a bean deployed on a single server node. | ||
* Tests fail-over of the EJB client invocation mechanism. | ||
* | ||
* NOTE: When shutting down a server, if this happens during discovery, we can have trouble: | ||
* - we shut down the server A | ||
* - discovery tries to contact all known nodes {A,B}; gets a channel closed exception | ||
* - discovery cannot reach node A | ||
* - topology update arrives which excludes A from available nodes | ||
* | ||
* @author <a href="mailto:[email protected]">Richard Achmatowicz</a> | ||
*/ | ||
|
@@ -50,13 +64,19 @@ public class ClusteredInvocationFailOverTestCase extends AbstractEJBClientTestCa | |
public static AtomicInteger SENT = new AtomicInteger(); | ||
|
||
private static final Logger logger = Logger.getLogger(ClusteredInvocationFailOverTestCase.class); | ||
private static final String PROPERTIES_FILE = "clustered-jboss-ejb-client.properties"; | ||
private static final String PROPERTIES_FILE = "jboss-ejb-client.properties"; | ||
|
||
private static final int THREADS = 1; | ||
|
||
private static final int THREADS = 40; | ||
public static final ClusterTopologyListener.ClusterRemovalInfo removal = DummyServer.getClusterRemovalInfo(CLUSTER_NAME, NODE1); | ||
public static final ClusterTopologyListener.ClusterInfo addition = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1); | ||
|
||
private static ExecutorService executorService; | ||
private volatile boolean runInvocations = true; | ||
|
||
Map<String, AtomicInteger> twoNodesUp = new HashMap<String, AtomicInteger>(); | ||
Map<String, AtomicInteger> oneNodeUp = new HashMap<String, AtomicInteger>(); | ||
|
||
/** | ||
* Do any general setup here | ||
* @throws Exception | ||
|
@@ -75,9 +95,14 @@ public static void beforeClass() throws Exception { | |
*/ | ||
@Before | ||
public void beforeTest() throws Exception { | ||
|
||
//startServer(0); | ||
startServerAndDeploy(1); | ||
// start a cluster of two nodes | ||
for (int i = 0; i < 2; i++) { | ||
startServer(i); | ||
deployStateless(i); | ||
defineCluster(i, CLUSTER); | ||
} | ||
twoNodesUp.clear(); | ||
oneNodeUp.clear(); | ||
} | ||
|
||
|
||
|
@@ -89,67 +114,111 @@ public void testClusteredSLSBInvocation() throws Exception { | |
List<Future<?>> retList = new ArrayList<>(); | ||
|
||
for(int i = 0; i < THREADS; ++i) { | ||
// set up THREADs number of invocation loops | ||
retList.add(executorService.submit((Callable<Object>) () -> { | ||
while (runInvocations) { | ||
final StatelessEJBLocator<Echo> statelessEJBLocator = new StatelessEJBLocator<Echo>(Echo.class, APP_NAME, MODULE_NAME, StatelessEchoBean.class.getSimpleName(), DISTINCT_NAME); | ||
final Echo proxy = EJBClient.createProxy(statelessEJBLocator); | ||
|
||
EJBClient.setStrongAffinity(proxy, new ClusterAffinity("ejb")); | ||
Assert.assertNotNull("Received a null proxy", proxy); | ||
logger.info("Created proxy for Echo: " + proxy.toString()); | ||
|
||
logger.info("Invoking on proxy..."); | ||
// invoke on the proxy (use a ClusterAffinity for now) | ||
final String message = "hello!"; | ||
SENT.incrementAndGet(); | ||
final Result<String> echoResult = proxy.echo(message); | ||
Assert.assertEquals("Got an unexpected echo", echoResult.getValue(), message); | ||
try { | ||
final StatelessEJBLocator<Echo> statelessEJBLocator = new StatelessEJBLocator<Echo>(Echo.class, APP_NAME, MODULE_NAME, StatelessEchoBean.class.getSimpleName(), DISTINCT_NAME); | ||
final Echo proxy = EJBClient.createProxy(statelessEJBLocator); | ||
|
||
EJBClient.setStrongAffinity(proxy, new ClusterAffinity("ejb")); | ||
Assert.assertNotNull("Received a null proxy", proxy); | ||
logger.info("Created proxy for Echo: " + proxy.toString()); | ||
|
||
logger.info("Invoking on proxy..."); | ||
// invoke on the proxy (use a ClusterAffinity for now) | ||
final String message = "hello!"; | ||
SENT.incrementAndGet(); | ||
final Result<String> echoResult = proxy.echo(message); | ||
Assert.assertEquals("Got an unexpected echo", echoResult.getValue(), message); | ||
|
||
// increment the invocation count | ||
if (isServerStarted(0) && isServerStarted(1)) { | ||
synchronized (twoNodesUp) { | ||
String node = echoResult.getNode(); | ||
AtomicInteger hits = twoNodesUp.get(node); | ||
if (hits == null) { | ||
twoNodesUp.put(node, new AtomicInteger(0)); | ||
hits = twoNodesUp.get(node); | ||
} | ||
hits.getAndIncrement(); | ||
logger.info("invocation on two nodes hit node: " + node); | ||
} | ||
} else if (isServerStarted(1)) { | ||
synchronized (oneNodeUp) { | ||
String node = echoResult.getNode(); | ||
AtomicInteger hits = oneNodeUp.get(node); | ||
if (hits == null) { | ||
oneNodeUp.put(node, new AtomicInteger(0)); | ||
hits = oneNodeUp.get(node); | ||
} | ||
hits.getAndIncrement(); | ||
logger.info("invocation on one nodes hit node: " + node); | ||
} | ||
} else { | ||
fail("Invocation hit unreachable target"); | ||
} | ||
|
||
} catch(Exception e) { | ||
if (e instanceof EJBException && e.getCause() instanceof ClosedChannelException) { | ||
// this is expected when we shut the server down asynchronously during an invocation | ||
} else { | ||
Thread.dumpStack(); | ||
fail("Invocation failed with exception " + e.toString()); | ||
} | ||
} | ||
} | ||
return "ok"; | ||
})); | ||
} | ||
|
||
// invoke | ||
Thread.sleep(500); | ||
|
||
// stop a server and update the topology of the remaining node | ||
logger.info("Stopping server: " + serverNames[0]); | ||
undeployStateless(0); | ||
stopServer(0); | ||
//startServer(0); | ||
//Thread.sleep(500); | ||
//stopServer(1); | ||
removeClusterNodes(1, removal); | ||
logger.info("Stopped server: " + serverNames[0]); | ||
|
||
|
||
// invoke | ||
Thread.sleep(500); | ||
|
||
// start a server and update the topologuy of the new node and the remaining node | ||
logger.info("Starting server: " + serverNames[0]); | ||
startServer(0); | ||
deployStateless(0); | ||
defineCluster(0, CLUSTER); | ||
addClusterNodes(1, addition); | ||
logger.info("Started server: " + serverNames[0]); | ||
|
||
// invoke | ||
Thread.sleep(500); | ||
|
||
runInvocations = false; | ||
for(Future<?> i : retList) { | ||
i.get(); | ||
} | ||
|
||
} | ||
|
||
private void undeployAndStopServer(int server) { | ||
if (isServerStarted(server)) { | ||
try { | ||
undeployStateless(server); | ||
removeCluster(server, CLUSTER_NAME); | ||
stopServer(server); | ||
} catch (Throwable t) { | ||
logger.info("Could not stop server", t); | ||
} finally { | ||
serversStarted[server] = false; | ||
} | ||
} | ||
} | ||
// check results | ||
System.out.println("map twoNodesUp = " + twoNodesUp.toString()); | ||
System.out.println("map oneNodeUp = " + oneNodeUp.toString()); | ||
|
||
private void startServerAndDeploy(int server) throws Exception { | ||
startServer(server, 6999 + (server * 100)); | ||
deployStateless(server); | ||
defineCluster(server, CLUSTER); | ||
} | ||
|
||
/** | ||
* Do any test-specific tear down here. | ||
*/ | ||
@After | ||
public void afterTest() { | ||
undeployAndStopServer(0); | ||
undeployAndStopServer(1); | ||
public void afterTest() throws Exception { | ||
// shutdown the cluster of two nodes | ||
for (int i = 0; i < 2; i++) { | ||
stopServer(i); | ||
undeployStateless(i); | ||
removeCluster(i, CLUSTER.getClusterName()); | ||
} | ||
} | ||
|
||
/** | ||
|
Oops, something went wrong.