Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
Delay of 3s per ASG during boot up.
Long term fix is to change this to completableFuture.
  • Loading branch information
Prudhviraj Karumanchi committed Dec 9, 2024
1 parent 8f0f42d commit 100e7c3
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
this._pool = poolManager.getEVCacheClientPool(_appName);
});

_pool.pingServers();
_pool.pingServers(true);

setupMonitoring();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public String getStatusCode(StatusCode sc) {
public static final String INTERNAL_POOL_SG_CONFIG = "internal.evc.client.pool.asg.config";
public static final String INTERNAL_POOL_CONFIG = "internal.evc.client.pool.config";
public static final String INTERNAL_POOL_REFRESH = "internal.evc.client.pool.refresh";
public static final String INTERNAL_PING_SERVER = "internal.evc.client.ping.server";
public static final String INTERNAL_PING_SERVER_FAILURES = "internal.evc.client.ping.server.failures";

public static final String INTERNAL_BOOTSTRAP_EUREKA = "internal.evc.client.pool.bootstrap.eureka";

Expand All @@ -309,7 +311,6 @@ public String getStatusCode(StatusCode sc) {
public static final String POOL_REFRESH_ASYNC = "refreshAsync";
public static final String POOL_OPERATIONS = "operations";


/**
* Metric Tags Names
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -979,7 +976,7 @@ private void updateMemcachedReadInstancesByZone() {
}

private void cleanupMemcachedInstances(boolean force) {
pingServers();
pingServers(false);
for (Iterator<Entry<ServerGroup, List<EVCacheClient>>> it = memcachedInstancesByServerGroup.entrySet().iterator(); it.hasNext();) {
final Entry<ServerGroup, List<EVCacheClient>> serverGroupEntry = it.next();
final List<EVCacheClient> instancesInAServerGroup = serverGroupEntry.getValue();
Expand Down Expand Up @@ -1109,7 +1106,7 @@ private synchronized void refresh(boolean force) throws IOException {
}
updateMemcachedReadInstancesByZone();
updateQueueStats();
if (_pingServers.get()) pingServers();
if (_pingServers.get()) pingServers(false);
} catch (Throwable t) {
log.error("Exception while refreshing the Server list", t);
} finally {
Expand Down Expand Up @@ -1167,52 +1164,91 @@ private void updateQueueStats() {
}
}

public void pingServers() {
public void pingServers(Boolean bootTimeCheck) {
final long start = System.currentTimeMillis();

try {
final Map<ServerGroup, List<EVCacheClient>> allServers = getAllInstancesByZone();

for (Entry<ServerGroup, List<EVCacheClient>> entry : allServers.entrySet()) {
final List<EVCacheClient> listOfClients = entry.getValue();
for (EVCacheClient client : listOfClients) {

int maxRetries = 10;
long retryDelayMs = 1000;
for (int i = 0; i < maxRetries; i++) {
final Map<SocketAddress, String> versions = client.getVersions();
boolean allNodesOk = true;

for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
String version = vEntry.getValue();
// Only accept version in format like "1.6.15"
if (!version.matches("\\d+\\.\\d+\\.\\d+")) {
allNodesOk = false;
log.warn("Node not ready or invalid version: {}, response: {}, attempt {}",
vEntry.getKey(), version, i + 1);
break;
}
}

if (allNodesOk) {
if (!bootTimeCheck) {
// Just log versions and continue if not a boot time check
try {
final Map<SocketAddress, String> versions = client.getVersions();
if (log.isDebugEnabled()) {
for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
log.debug("Host : {} Version : {}", vEntry.getKey(), vEntry.getValue());
}
}
break;
continue;
} catch (Exception e) {
log.warn("Error getting versions for client: {}", client, e);
continue;
}
}

if (i < maxRetries - 1) {
Thread.sleep(retryDelayMs);
} else {
log.error("Some nodes not ready after max retries for client: {}", client);
long startTime = System.currentTimeMillis();
long timeoutMs = 3000; // 3 seconds
boolean success = false;

while (System.currentTimeMillis() - startTime < timeoutMs && !success) {
try {
final Map<SocketAddress, String> versions = client.getVersions();
boolean allNodesOk = true;

for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
String version = vEntry.getValue();
if (!version.matches("\\d+\\.\\d+\\.\\d+")) {
allNodesOk = false;
log.warn("Node not ready or invalid version: {}, response: {}",
vEntry.getKey(), version);
break;
}
}

if (allNodesOk) {
if (log.isDebugEnabled()) {
for (Entry<SocketAddress, String> vEntry : versions.entrySet()) {
log.debug("Host : {} Version : {}", vEntry.getKey(), vEntry.getValue());
}
}
success = true;
break;
}

Thread.sleep(100); // 100ms delay between retries
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while pinging servers for client: {}", client);
break;
} catch (Exception e) {
log.warn("Error while pinging server for client: {}", client, e);
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}

if (!success && bootTimeCheck) {
log.warn("Failed to get valid version from client {} within timeout", client);
EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_PING_SERVER_FAILURES,tagList).increment();
}
}
}

if (duetClientPool != null)
duetClientPool.pingServers();
if (duetClientPool != null) {
duetClientPool.pingServers(false);
}
} catch (Throwable t) {
log.error("Error while pinging the servers", t);
log.warn("Error while pinging the servers", t);
EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_PING_SERVER_FAILURES,tagList).increment();
} finally {
EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.INTERNAL_PING_SERVER, tagList, Duration.ofMillis(100)).record(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
}
}

Expand Down

0 comments on commit 100e7c3

Please sign in to comment.