Skip to content

Commit

Permalink
[EJBCLIENT-402] Convert some remaining testcases to use AbstractEJBCl…
Browse files Browse the repository at this point in the history
…ientTestCase
  • Loading branch information
rachmatowicz committed Feb 15, 2021
1 parent 54331f5 commit 69e9370
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 156 deletions.
107 changes: 77 additions & 30 deletions src/test/java/org/jboss/ejb/client/test/AbstractEJBClientTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public class AbstractEJBClientTestCase {
public DummyServer[] servers = new DummyServer[NUM_SERVERS];
public static boolean[] serversStarted = new boolean[NUM_SERVERS] ;

// module
// modules
public static final String APP_NAME = "my-foo-app";
public static final String OTHER_APP = "my-other-app";
public static final String MODULE_NAME = "my-bar-module";
public static final String DISTINCT_NAME = "";

// cluster
// note: logical node names and server names should match!
// clusters
// note: node names and server names should match!
public static final String CLUSTER_NAME = "ejb";
public static final String NODE1_NAME = "node1";
public static final String NODE2_NAME = "node2";
Expand All @@ -64,16 +64,33 @@ public class AbstractEJBClientTestCase {
public static final ClusterTopologyListener.NodeInfo NODE2 = DummyServer.getNodeInfo(NODE2_NAME, "localhost",7099,"0.0.0.0",0);
public static final ClusterTopologyListener.NodeInfo NODE3 = DummyServer.getNodeInfo(NODE3_NAME, "localhost",7199,"0.0.0.0",0);
public static final ClusterTopologyListener.NodeInfo NODE4 = DummyServer.getNodeInfo(NODE4_NAME, "localhost",7299,"0.0.0.0",0);
public static final ClusterTopologyListener.ClusterInfo CLUSTER = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1, NODE2);
public static final ClusterTopologyListener.ClusterInfo CLUSTER_2_NODES = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1, NODE2);
public static final ClusterTopologyListener.ClusterInfo CLUSTER_3_NODES = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1, NODE2, NODE3);
public static final ClusterTopologyListener.ClusterInfo CLUSTER_4_NODES = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1, NODE2, NODE3, NODE4);
// most common case
public static final ClusterTopologyListener.ClusterInfo CLUSTER = CLUSTER_2_NODES;

// convenience
// convenience identifiers
public final EJBModuleIdentifier MODULE_IDENTIFIER = new EJBModuleIdentifier(APP_NAME, MODULE_NAME, DISTINCT_NAME);
public final EJBModuleIdentifier OTHER_MODULE_IDENTIFIER = new EJBModuleIdentifier(OTHER_APP, MODULE_NAME, DISTINCT_NAME);
public final EJBIdentifier STATELESS_IDENTIFIER = new EJBIdentifier(MODULE_IDENTIFIER,StatelessEchoBean.class.getSimpleName());
public final EJBIdentifier STATEFUL_IDENTIFIER = new EJBIdentifier(MODULE_IDENTIFIER,StatefulEchoBean.class.getSimpleName());

//
// start(), stop() servers
// - these methods are used to model the following scenarios
// - starting and stopping up to 4 servers using host={localhost} and port={6999, 7199, 7299, 7399}
//

/* start a server with hostname = localhost" and Remoting Transaction service enabled*/
public void startServer(int index) throws Exception {
startServer(index, false);
}

public void startServer(int index, boolean startTxService) throws Exception {
startServer(index, 6999 + (index*100), startTxService);
}

// deprecate
public void startServer(int index, int port) throws Exception {
startServer(index, port, false);
}
Expand Down Expand Up @@ -116,14 +133,33 @@ public void crashServer(int server) {
}
}

/*
public void killServer(int server) {
if (serversStarted[server]) {
try {
this.servers[server].hardKill();
logger.info("Killed server " + serverNames[server]);
} catch (Throwable t) {
logger.info("Could not kill server", t);
} finally {
serversStarted[server] = false;
}
}
}
*/

public static boolean isServerStarted(int index) {
return serversStarted[index];
}

/*
* bean deployment helpers for generic beans StatefulEchoBean, StatelessEchoBean in module "my-foo-app"/"my-bar-module"
*/

//
// deploy(), undeploy() modules
// - these methods are used to model the following scenarios:
// - deploy and undeploy a generic stateful or stateless bean in module named APP_NAME/MODULE_NAME/DISTINCT_NAME
// - deploy and undeploy a generic stateful or stateless bean in module named OTHER_APP/MODULE_NAME/DISTINCT_NAME
//
//
public void deployStateless(int index) {
servers[index].register(APP_NAME, MODULE_NAME, DISTINCT_NAME, StatelessEchoBean.class.getSimpleName(), new StatelessEchoBean(serverNames[index]));
logger.info("Registered SLSB module " + MODULE_IDENTIFIER.toString() + " on server " + serverNames[index]);
Expand All @@ -144,27 +180,6 @@ public void undeployStateful(int index) {
logger.info("Unregistered SFSB module " + MODULE_IDENTIFIER.toString() + " on server " + serverNames[index]);
}


public void defineCluster(int index, ClusterTopologyListener.ClusterInfo cluster) {
servers[index].addCluster(cluster);
logger.info("Added node to cluster " + cluster + ": server " + servers[index]);
}

public void addClusterNodes(int index, ClusterTopologyListener.ClusterInfo cluster) {
servers[index].addClusterNodes(cluster);
logger.info("Added node(s) to cluster " + cluster + ":" + cluster.getNodeInfoList());
}

public void removeClusterNodes(int index, ClusterTopologyListener.ClusterRemovalInfo cluster) {
servers[index].removeClusterNodes(cluster);
logger.info("Removed node(s) from cluster " + cluster + ":" + cluster.getNodeNames());
}

public void removeCluster(int index, String clusterName) {
servers[index].removeCluster(clusterName);
logger.info("Removed cluster " + clusterName + " from node: server " + servers[index]);
}

public void deployOtherStateless(int index) {
servers[index].register(OTHER_APP, MODULE_NAME, DISTINCT_NAME, StatelessEchoBean.class.getSimpleName(), new StatelessEchoBean(serverNames[index]));
logger.info("Registered other SLSB module " + MODULE_IDENTIFIER.toString() + " on server " + serverNames[index]);
Expand Down Expand Up @@ -195,4 +210,36 @@ public void undeployCustomBean(int index, String app, String module, String dist
logger.info("Unregistered custom bean " + (new EJBModuleIdentifier(app, module, distinct)).toString() + " on server " + serverNames[index]);
}

//
// manage clusters
// - when we want to model nodes which are also clustered, we need to use these methods to describe the clusters a node has joined
// - the methods affect generation of topology updates and module updates sent by the DummyServer instances back to the client
// - operations are available to:
// - define a cluster and its member nodes
// - add nodes to a defined cluster
// - remove nodes from a defined cluster
// - remove a cluster and all of its defined nodes
// - it is important to realise that these methods apply on a server by server basis; in other words,
// if we want to represent the fact that there is a cluster called 'myCluster' with members 'myNode1' and 'myNode2' in the test, we need to
// set up that representation on each node separately

public void defineCluster(int index, ClusterTopologyListener.ClusterInfo cluster) {
servers[index].addCluster(cluster);
logger.info("Added node to cluster " + cluster + ": server " + servers[index]);
}

public void addClusterNodes(int index, ClusterTopologyListener.ClusterInfo cluster) {
servers[index].addClusterNodes(cluster);
logger.info("Added node(s) to cluster " + cluster + ":" + cluster.getNodeInfoList());
}

public void removeClusterNodes(int index, ClusterTopologyListener.ClusterRemovalInfo cluster) {
servers[index].removeClusterNodes(cluster);
logger.info("Removed node(s) from cluster " + cluster + ":" + cluster.getNodeNames());
}

public void removeCluster(int index, String clusterName) {
servers[index].removeCluster(clusterName);
logger.info("Removed cluster " + clusterName + " from node: server " + servers[index]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.jboss.ejb.client.EJBClient;
import org.jboss.ejb.client.StatelessEJBLocator;
import org.jboss.ejb.client.legacy.JBossEJBProperties;
import org.jboss.ejb.client.test.common.DummyServer;
import org.jboss.ejb.client.test.common.Echo;
import org.jboss.ejb.client.test.common.Result;
import org.jboss.ejb.client.test.common.StatelessEchoBean;
import org.jboss.ejb.server.ClusterTopologyListener;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -32,16 +34,28 @@
import org.junit.BeforeClass;
import org.junit.Test;

import javax.ejb.EJBException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.fail;

/**
* Tests basic invocation of a bean deployed on a single server node.
* Tests fail-over of the EJB client invocation mechanism.
*
* NOTE: When shutting down a server, if this happens during discovery, we can have trouble:
* - we shut down the server A
* - discovery tries to contact all known nodes {A,B}; gets a channel closed exception
* - discovery cannot reach node A
* - topology update arrives which excludes A from available nodes
*
* @author <a href="mailto:[email protected]">Richard Achmatowicz</a>
*/
Expand All @@ -50,13 +64,19 @@ public class ClusteredInvocationFailOverTestCase extends AbstractEJBClientTestCa
public static AtomicInteger SENT = new AtomicInteger();

private static final Logger logger = Logger.getLogger(ClusteredInvocationFailOverTestCase.class);
private static final String PROPERTIES_FILE = "clustered-jboss-ejb-client.properties";
private static final String PROPERTIES_FILE = "jboss-ejb-client.properties";

private static final int THREADS = 1;

private static final int THREADS = 40;
public static final ClusterTopologyListener.ClusterRemovalInfo removal = DummyServer.getClusterRemovalInfo(CLUSTER_NAME, NODE1);
public static final ClusterTopologyListener.ClusterInfo addition = DummyServer.getClusterInfo(CLUSTER_NAME, NODE1);

private static ExecutorService executorService;
private volatile boolean runInvocations = true;

Map<String, AtomicInteger> twoNodesUp = new HashMap<String, AtomicInteger>();
Map<String, AtomicInteger> oneNodeUp = new HashMap<String, AtomicInteger>();

/**
* Do any general setup here
* @throws Exception
Expand All @@ -75,9 +95,14 @@ public static void beforeClass() throws Exception {
*/
@Before
public void beforeTest() throws Exception {

//startServer(0);
startServerAndDeploy(1);
// start a cluster of two nodes
for (int i = 0; i < 2; i++) {
startServer(i);
deployStateless(i);
defineCluster(i, CLUSTER);
}
twoNodesUp.clear();
oneNodeUp.clear();
}


Expand All @@ -89,67 +114,111 @@ public void testClusteredSLSBInvocation() throws Exception {
List<Future<?>> retList = new ArrayList<>();

for(int i = 0; i < THREADS; ++i) {
// set up THREADs number of invocation loops
retList.add(executorService.submit((Callable<Object>) () -> {
while (runInvocations) {
final StatelessEJBLocator<Echo> statelessEJBLocator = new StatelessEJBLocator<Echo>(Echo.class, APP_NAME, MODULE_NAME, StatelessEchoBean.class.getSimpleName(), DISTINCT_NAME);
final Echo proxy = EJBClient.createProxy(statelessEJBLocator);

EJBClient.setStrongAffinity(proxy, new ClusterAffinity("ejb"));
Assert.assertNotNull("Received a null proxy", proxy);
logger.info("Created proxy for Echo: " + proxy.toString());

logger.info("Invoking on proxy...");
// invoke on the proxy (use a ClusterAffinity for now)
final String message = "hello!";
SENT.incrementAndGet();
final Result<String> echoResult = proxy.echo(message);
Assert.assertEquals("Got an unexpected echo", echoResult.getValue(), message);
try {
final StatelessEJBLocator<Echo> statelessEJBLocator = new StatelessEJBLocator<Echo>(Echo.class, APP_NAME, MODULE_NAME, StatelessEchoBean.class.getSimpleName(), DISTINCT_NAME);
final Echo proxy = EJBClient.createProxy(statelessEJBLocator);

EJBClient.setStrongAffinity(proxy, new ClusterAffinity("ejb"));
Assert.assertNotNull("Received a null proxy", proxy);
logger.info("Created proxy for Echo: " + proxy.toString());

logger.info("Invoking on proxy...");
// invoke on the proxy (use a ClusterAffinity for now)
final String message = "hello!";
SENT.incrementAndGet();
final Result<String> echoResult = proxy.echo(message);
Assert.assertEquals("Got an unexpected echo", echoResult.getValue(), message);

// increment the invocation count
if (isServerStarted(0) && isServerStarted(1)) {
synchronized (twoNodesUp) {
String node = echoResult.getNode();
AtomicInteger hits = twoNodesUp.get(node);
if (hits == null) {
twoNodesUp.put(node, new AtomicInteger(0));
hits = twoNodesUp.get(node);
}
hits.getAndIncrement();
logger.info("invocation on two nodes hit node: " + node);
}
} else if (isServerStarted(1)) {
synchronized (oneNodeUp) {
String node = echoResult.getNode();
AtomicInteger hits = oneNodeUp.get(node);
if (hits == null) {
oneNodeUp.put(node, new AtomicInteger(0));
hits = oneNodeUp.get(node);
}
hits.getAndIncrement();
logger.info("invocation on one nodes hit node: " + node);
}
} else {
fail("Invocation hit unreachable target");
}

} catch(Exception e) {
if (e instanceof EJBException && e.getCause() instanceof ClosedChannelException) {
// this is expected when we shut the server down asynchronously during an invocation
} else {
Thread.dumpStack();
fail("Invocation failed with exception " + e.toString());
}
}
}
return "ok";
}));
}

// invoke
Thread.sleep(500);

// stop a server and update the topology of the remaining node
logger.info("Stopping server: " + serverNames[0]);
undeployStateless(0);
stopServer(0);
//startServer(0);
//Thread.sleep(500);
//stopServer(1);
removeClusterNodes(1, removal);
logger.info("Stopped server: " + serverNames[0]);


// invoke
Thread.sleep(500);

// start a server and update the topologuy of the new node and the remaining node
logger.info("Starting server: " + serverNames[0]);
startServer(0);
deployStateless(0);
defineCluster(0, CLUSTER);
addClusterNodes(1, addition);
logger.info("Started server: " + serverNames[0]);

// invoke
Thread.sleep(500);

runInvocations = false;
for(Future<?> i : retList) {
i.get();
}

}

private void undeployAndStopServer(int server) {
if (isServerStarted(server)) {
try {
undeployStateless(server);
removeCluster(server, CLUSTER_NAME);
stopServer(server);
} catch (Throwable t) {
logger.info("Could not stop server", t);
} finally {
serversStarted[server] = false;
}
}
}
// check results
System.out.println("map twoNodesUp = " + twoNodesUp.toString());
System.out.println("map oneNodeUp = " + oneNodeUp.toString());

private void startServerAndDeploy(int server) throws Exception {
startServer(server, 6999 + (server * 100));
deployStateless(server);
defineCluster(server, CLUSTER);
}

/**
* Do any test-specific tear down here.
*/
@After
public void afterTest() {
undeployAndStopServer(0);
undeployAndStopServer(1);
public void afterTest() throws Exception {
// shutdown the cluster of two nodes
for (int i = 0; i < 2; i++) {
stopServer(i);
undeployStateless(i);
removeCluster(i, CLUSTER.getClusterName());
}
}

/**
Expand Down
Loading

0 comments on commit 69e9370

Please sign in to comment.