Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DISCOVERY_CONNECTION_TIMEOUT #499

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ public final class DiscoveryEJBClientInterceptor implements EJBClientInterceptor
private static final boolean WILDFLY_TESTSUITE_HACK = Boolean.getBoolean("org.jboss.ejb.client.wildfly-testsuite-hack");
// This provides a way timeout a discovery, avoiding blocking on some edge cases. See EJBCLIENT-311.
private static final long DISCOVERY_TIMEOUT = Long.parseLong(WildFlySecurityManager.getPropertyPrivileged("org.jboss.ejb.client.discovery.timeout", "0"));
//how long to wait if at least one node has already been discovered. This one is in ms rather than s
private static final long DISCOVERY_ADDITIONAL_TIMEOUT = Long.parseLong(WildFlySecurityManager.getPropertyPrivileged("org.jboss.ejb.client.discovery.additional-node-timeout", "0"));

/**
* This interceptor's priority.
Expand Down Expand Up @@ -202,15 +200,6 @@ public SessionID handleSessionCreation(final EJBSessionCreationInvocationContext
return sessionID;
}

/**
* Gets the value (in milliseconds) of discovery additional timeout,
* configured with system property {@code org.jboss.ejb.client.discovery.additional-node-timeout}.
*
* @return the value (in milliseconds) of discovery additional timeout
*/
public static long getDiscoveryAdditionalTimeout() {
return DISCOVERY_ADDITIONAL_TIMEOUT;
}

/**
* Intended to be called by interceptors which assign a new destination
Expand Down Expand Up @@ -501,10 +490,9 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
final Map<URI, List<String>> clusterAssociations = new HashMap<>();

int nodeless = 0;
long timeout = DISCOVERY_TIMEOUT * 1000;
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
while ((serviceURL = queue.takeService(timeout, TimeUnit.MILLISECONDS)) != null) {
while ((serviceURL = queue.takeService(DISCOVERY_TIMEOUT, TimeUnit.SECONDS)) != null) {
final URI location = serviceURL.getLocationURI();
if (!blacklist.contains(location)) {
// Got a match! See if there's a node affinity to set for the invocation.
Expand Down Expand Up @@ -543,10 +531,6 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
}
}
}
//one has already been discovered, we may want a shorter timeout for additional nodes
if (DISCOVERY_ADDITIONAL_TIMEOUT != 0) {
timeout = DISCOVERY_ADDITIONAL_TIMEOUT; //this one is actually in ms, you generally want it very short
}
}
}
problems = queue.getProblems();
Expand Down Expand Up @@ -634,7 +618,7 @@ private List<Throwable> doClusterDiscovery(AbstractInvocationContext context, fi
final Set<URI> blacklist = getBlacklist();
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
while ((serviceURL = queue.takeService()) != null) {
while ((serviceURL = queue.takeService(DISCOVERY_TIMEOUT, TimeUnit.SECONDS)) != null) {
final URI location = serviceURL.getLocationURI();
if (!blacklist.contains(location)) {
final EJBReceiver transportProvider = clientContext.getTransportProvider(location.getScheme());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import javax.net.ssl.SSLContext;

import org.jboss.ejb._private.Logs;
import org.jboss.ejb.client.DiscoveryEJBClientInterceptor;
import org.jboss.ejb.client.EJBClientConnection;
import org.jboss.ejb.client.EJBClientContext;
import org.jboss.ejb.client.EJBModuleIdentifier;
Expand Down Expand Up @@ -82,6 +81,12 @@
*/
final class RemotingEJBDiscoveryProvider implements DiscoveryProvider, DiscoveredNodeRegistry {

// limit on time for which nodes marked as failed are not used for connection attempts
private static final long DESTINATION_RECHECK_INTERVAL_DEFAULT_MS = 5000L;

// limit on how long a connection attempt can take before it is cancelled and node marked as failed
private static final long DISCOVERY_CONNECTION_TIMEOUT_DEFAULT_MS = 5000L;

static final AuthenticationContextConfigurationClient AUTH_CONFIGURATION_CLIENT = doPrivileged(AuthenticationContextConfigurationClient.ACTION);

private final ConcurrentHashMap<String, NodeInformation> nodes = new ConcurrentHashMap<>();
Expand All @@ -98,7 +103,18 @@ final class RemotingEJBDiscoveryProvider implements DiscoveryProvider, Discovere
try {
return TimeUnit.MILLISECONDS.toNanos(Long.valueOf(val));
} catch (NumberFormatException e) {
return TimeUnit.MILLISECONDS.toNanos(5000L);
return TimeUnit.MILLISECONDS.toNanos(DESTINATION_RECHECK_INTERVAL_DEFAULT_MS);
}
});

// timeout (in ms) for discovery connection attempts
private static final long DISCOVERY_CONNECTION_TIMEOUT =
AccessController.doPrivileged((PrivilegedAction<Long>) () -> {
String val = System.getProperty("org.jboss.ejb.client.discovery-connection-timeout");
try {
return Long.valueOf(val);
} catch (NumberFormatException e) {
return DISCOVERY_CONNECTION_TIMEOUT_DEFAULT_MS;
}
});

Expand Down Expand Up @@ -372,10 +388,6 @@ final class DiscoveryAttempt implements DiscoveryRequest, DiscoveryResult {
// keep a record of URIs we try to connect to for each cluster
private final ConcurrentHashMap<String, Set<URI>> urisByCluster = new ConcurrentHashMap<>();
private final Set<URI> connectFailedURIs = new HashSet<>();
/**
* nodes that have already been provided to the discovery provider eagerly
*/
private final Set<String> eagerNodes;

DiscoveryAttempt(final ServiceType serviceType, final FilterSpec filterSpec, final DiscoveryResult discoveryResult, final RemoteEJBReceiver ejbReceiver, final AuthenticationContext authenticationContext) {
this.serviceType = serviceType;
Expand Down Expand Up @@ -422,8 +434,6 @@ public void handleDone(final EJBClientChannel clientChannel, final URI destinati
countDown();
}
};

eagerNodes = DiscoveryEJBClientInterceptor.getDiscoveryAdditionalTimeout() == 0 ? null : Collections.synchronizedSet(new HashSet<>());
}

void connectAndDiscover(URI uri, String clusterEffective) {
Expand All @@ -447,6 +457,11 @@ void connectAndDiscover(URI uri, String clusterEffective) {
}
onCancel(future::cancel);
future.addNotifier(outerNotifier, uri);

// create a new thread to check for timeout on connection attempts (XnioWorker provides an ExecutorService for Remoting-related Runnables)
if (DISCOVERY_CONNECTION_TIMEOUT > 0) {
endpoint.getXnioWorker().submit(new ConnectionAttemptTimeoutHandler(uri, future, DISCOVERY_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
}
}

void countDown() {
Expand All @@ -473,36 +488,28 @@ void countDown() {
final EJBModuleIdentifier module = filterSpec.accept(MI_EXTRACTOR);
if (phase2) {
if (node != null) {
if (eagerNodes == null || !eagerNodes.contains(node)) {
final NodeInformation information = nodes.get(node);
if (information != null) information.discover(serviceType, filterSpec, result);
}
final NodeInformation information = nodes.get(node);
if (information != null) information.discover(serviceType, filterSpec, result);
} else for (NodeInformation information : nodes.values()) {
if (eagerNodes == null || !eagerNodes.contains(information.getNodeName())) {
information.discover(serviceType, filterSpec, result);
}
information.discover(serviceType, filterSpec, result);
}
result.complete();
} else {
boolean ok = false;
// optimize for simple module identifier and node name queries
if (node != null) {
if (eagerNodes == null || !eagerNodes.contains(node)) {
final NodeInformation information = nodes.get(node);
if (information != null) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
}
}
}
} else for (NodeInformation information : nodes.values()) {
if (eagerNodes == null || !eagerNodes.contains(information.getNodeName())) {
final NodeInformation information = nodes.get(node);
if (information != null) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
}
}
} else for (NodeInformation information : nodes.values()) {
if (information.discover(serviceType, filterSpec, result)) {
ok = true;
}
}
if (ok || (eagerNodes != null && !eagerNodes.isEmpty())) {
if (ok) {
result.complete();
} else {
// everything failed. We have to reconnect everything.
Expand Down Expand Up @@ -559,25 +566,6 @@ void countDown() {
countDown();
}
}
} else if (eagerNodes != null) {
final DiscoveryResult result = this.discoveryResult;
final String node = filterSpec.accept(NODE_EXTRACTOR);
if (node != null) {
if (!eagerNodes.contains(node)) {
final NodeInformation information = nodes.get(node);
if (information != null) {
if (information.discover(serviceType, filterSpec, result)) {
eagerNodes.add(node);
}
}
}
} else for (NodeInformation information : nodes.values()) {
if (!eagerNodes.contains(information.getNodeName())) {
if (information.discover(serviceType, filterSpec, result)) {
eagerNodes.add(information.getNodeName());
}
}
}
}
}

Expand Down Expand Up @@ -613,4 +601,37 @@ void onCancel(final Runnable action) {
}
}
}

/**
* Runnable which cancels a future representing a connection attempt if it execeeds a discovery timeout
* This allows us to place an upper bound on the time taken for connections during discovery.
*/
private class ConnectionAttemptTimeoutHandler implements Runnable {
URI uri ;
IoFuture<ConnectionPeerIdentity> future ;
long timeout = 0;
TimeUnit timeUnit;

ConnectionAttemptTimeoutHandler(URI uri, IoFuture<ConnectionPeerIdentity> future, long timeout, TimeUnit timeUnit) {
Assert.checkNotNullParam("future", future);
this.uri = uri;
this.future = future ;
this.timeout = timeout;
this.timeUnit = timeUnit;
}

public void run() {
long start = System.currentTimeMillis();
Logs.INVOCATION.tracef("DiscoveryAttempt: starting discovery connection attempt to %s ", uri);
if (future.await(timeout, timeUnit) == IoFuture.Status.WAITING) {
// cancel this connection attempt and put this in the sin bin
Logs.INVOCATION.warnf("DiscoveryAttempt: connection attempt to node %s has timed out after %s %s; cancelling the connection attempt", uri, timeout, timeUnit);
future.cancel();
failedDestinations.put(uri, System.nanoTime());
}
long stop = System.currentTimeMillis();
Logs.INVOCATION.tracef("DiscoveryAttempt: finished discovery connection attempt to %s (%s ms)", uri, (stop-start));
}
}

}
Loading