Skip to content

Commit

Permalink
Adding comments and refactoring a few classes
Browse files Browse the repository at this point in the history
Resolving comments. Adding more documentation and a test
  • Loading branch information
rsrinivasanNetflix committed Jul 10, 2019
1 parent 4efa36d commit 39f09af
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ArchaiusConnectionPoolConfiguration(String name) {
configPublisherConfig = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".config.publisher.address", super.getConfigurationPublisherConfig());
failOnStartupIfNoHosts = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".config.startup.failIfNoHosts", super.getFailOnStartupIfNoHosts());
compressionThreshold = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.compressionThreshold", super.getValueCompressionThreshold());
lockVotingSize = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.lock.votinSize", super.getLockVotingSize());
lockVotingSize = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.lock.votingSize", super.getLockVotingSize());

loadBalanceStrategy = parseLBStrategy(propertyPrefix);
errorRateConfig = parseErrorRateMonitorConfig(propertyPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public Host apply(HealthService info) {
Host host = new HostBuilder().setHostname(hostName)
.setIpAddress(hostName)
.setPort(info.getService().getPort())
.setRack(rack).setDatacenter(String.valueOf(metaData.get("datacenter")))
.setStatus(status).createHost();
.setRack(rack)
.setDatacenter(String.valueOf(metaData.get("datacenter")))
.setStatus(status)
.createHost();
return host;
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ enum CompressionStrategy {
THRESHOLD
}

/**
* Should connections in this pool connect to the datastore directly?
* @return
*/
boolean isConnectToDatastore();

boolean isFallbackEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,6 @@ public ConnectionPoolConfigurationImpl withPoolReconnectWaitMillis(int millis) {
return this;
}

public ConnectionPoolConfigurationImpl withConnectToDatastore(boolean condition) {
connectToDatastore = condition;
return this;
}

public static class ErrorRateMonitorConfigImpl implements ErrorRateMonitorConfig {

int window = 20;
Expand Down
26 changes: 11 additions & 15 deletions dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

@Singleton
public class DynoJedisUtils {
Expand Down Expand Up @@ -117,7 +118,6 @@ private static void setHashtagConnectionPool(HostSupplier hostSupplier, Connecti
// Find the hosts from host supplier
List<Host> hosts = (List<Host>) hostSupplier.getHosts();
Collections.sort(hosts);
// Convert the arraylist to set

// Take the token map supplier (aka the token topology from
// Dynomite)
Expand All @@ -139,26 +139,22 @@ private static void setHashtagConnectionPool(HostSupplier hostSupplier, Connecti
}

String hashtag = hostTokens.get(0).getHost().getHashtag();
short numHosts = 0;
// Update inner state with the host tokens.
for (HostToken hToken : hostTokens) {
Stream<String> htStream = hostTokens.stream().map(hostToken -> hostToken.getHost().getHashtag());

if (hashtag == null) {
htStream.filter(ht -> ht != null).findAny().ifPresent(ignore -> {
logger.error("Hashtag mismatch across hosts");
throw new RuntimeException("Hashtags are different across hosts");
});
} else {
/**
* Checking hashtag consistency from all Dynomite hosts. If
* hashtags are not consistent, we need to throw an exception.
*/
String hashtagNew = hToken.getHost().getHashtag();

if (hashtag != null && !hashtag.equals(hashtagNew)) {
htStream.filter(ht -> !hashtag.equals(ht)).findAny().ifPresent(ignore -> {
logger.error("Hashtag mismatch across hosts");
throw new RuntimeException("Hashtags are different across hosts");
} // addressing case hashtag = null, hashtag = {} ...
else if (numHosts > 0 && hashtag == null && hashtagNew != null) {
logger.error("Hashtag mismatch across hosts");
throw new RuntimeException("Hashtags are different across hosts");

}
hashtag = hashtagNew;
numHosts++;
});
}

if (hashtag != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public String getName() {
return op.name();
}

/**
* Sends back only the first key of the multi key operation.
* @return
*/
@Override
public String getStringKey() {
return (this.keys != null) ? this.keys.get(0) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.netflix.discovery.EurekaClient;
import com.netflix.dyno.connectionpool.ConnectionPool;
import com.netflix.dyno.connectionpool.ConnectionPoolConfiguration;
import com.netflix.dyno.connectionpool.ConnectionPoolMonitor;
import com.netflix.dyno.connectionpool.HostSupplier;
import com.netflix.dyno.connectionpool.TokenMapSupplier;
Expand All @@ -12,6 +11,9 @@
import com.netflix.dyno.contrib.DynoOPMonitor;
import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.dyno.jedis.DynoJedisUtils;
import com.netflix.dyno.recipes.lock.command.CheckAndRunHost;
import com.netflix.dyno.recipes.lock.command.ExtendHost;
import com.netflix.dyno.recipes.lock.command.LockHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
Expand All @@ -32,39 +34,53 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Client for acquiring locks similar to the redlock implementation https://redis.io/topics/distlock
*
* This locking mechanism does not give the guarantees for safety but can be used for efficiency.
*
* We assume some amount of clock skew between the client and server instances. Any major deviations in this
* will result in reduced accuracy of the lock.
*
* In the common locking case we rely on TTL's being set on a majority of redis servers in order to achieve the right
* locking characteristic. If we are unable to reach a majority of hosts when we try to acquire a lock or extend a lock.
*
* We try to release locks on all the hosts when we either shutdown or are unable to lock on a majority of hosts successfully
*
* These are the main edge cases where locking might not be mutually exclusive
*
* 1. An instance we acquired the lock on goes down and gets replaced by a new instance before the lock TTL expires.
* As suggested in the blog above, we need to ensure that new servers take longer than TTL time to come up so that
* any existing locks would've expired by then.(the client side code cannot control how quickly your servers come up). This can
* become a real issue if you're bringing up new servers in containers which can come up in a few seconds and you are holding locks for longer
* than the amount of time it takes for a new server to come up.
* 2. You have the JVM go into GC from when you acquired the lock to when you are going to modify the resource blocked by the lock.
* The client needs to ensure that GC is not happening for a long enough time that it can effect the assumption of lock being held (or have an alert on long GCs).
*
*/
public class DynoLockClient {

private static final Logger logger = LoggerFactory.getLogger(DynoJedisClient.class);

private final String appName;
private final String clusterName;
private final TokenMapSupplier tokenMapSupplier;
private final HostSupplier hostSupplier;
private final ConnectionPoolConfiguration cpConfig;
private final ConnectionPool pool;
private final VotingHostsSelector votingHostsSelector;
private final ExecutorService service;
private final int quorum;
// We assume a small amount of clock drift.
private final double CLOCK_DRIFT = 0.01;
private TimeUnit timeoutUnit;
private long timeout;
private final ConcurrentHashMap<String, String> resourceKeyMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, String> resourceExecutorMap = new ConcurrentHashMap<>();

public DynoLockClient(String appName, String clusterName, ConnectionPool pool, HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier,
ConnectionPoolConfiguration cpConfig, VotingHostsSelector votingHostsSelector, long timeout, TimeUnit unit) {
this.appName = appName;
this.clusterName = clusterName;
this.tokenMapSupplier = tokenMapSupplier;
this.hostSupplier = hostSupplier;
this.cpConfig = cpConfig;

public DynoLockClient(ConnectionPool pool, VotingHostsSelector votingHostsSelector, long timeout, TimeUnit unit) {
this.pool = pool;
this.votingHostsSelector = votingHostsSelector;
// Threads for locking and unlocking
this.service = Executors.newCachedThreadPool();
this.quorum = votingHostsSelector.getVotingSize() / 2 + 1;
this.timeout = timeout;
this.timeoutUnit = unit;
// We want to release all locks in case of a graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> cleanup()));
}

Expand All @@ -80,10 +96,18 @@ private static String getRandomString() {
return UUID.randomUUID().toString();
}

/**
* Gets list of resources which are locked by the client
* @return
*/
public List<String> getLockedResources() {
return new ArrayList<>(resourceKeyMap.values());
return new ArrayList<>(resourceKeyMap.keySet());
}

/**
* Release the lock (if held) on the resource
* @param resource
*/
public void releaseLock(String resource) {
if (!checkResourceExists(resource)) {
logger.info("No lock held on {}", resource);
Expand All @@ -109,6 +133,14 @@ public void releaseLock(String resource) {
resourceKeyMap.remove(resource);
}

/**
* Return a timer task which will recursively schedule itself when extension is successful.
* @param runJob
* @param resource
* @param ttlMS
* @param extensionFailed - This function gets called with the resource name when extension was unsuccessful.
* @return
*/
private TimerTask getExtensionTask(Timer runJob, String resource, long ttlMS, Consumer<String> extensionFailed) {
return new TimerTask() {
@Override
Expand All @@ -125,13 +157,29 @@ public void run() {
};
}

/**
* Try to acquire lock on resource for ttlMS and keep extending it by ttlMS when its about to expire.
* Calls the failure function with the resource when extension fails.
* @param resource The resource on which you want to acquire a lock
* @param ttlMS The amount of time for which we need to acquire the lock. We try to extend the lock every ttlMS / 2
* @param failure This function will be called with the resource which could not be locked. This function is called
* even if the client released the lock.
* @return returns true if we were able to successfully acqurie the lock.
*/
public boolean acquireLock(String resource, long ttlMS, Consumer<String> failure) {
return acquireLockWithExtension(resource, ttlMS, (r) -> {
releaseLock(r);
failure.accept(r);
});
}

/**
* Try to acquire the lock and schedule extension jobs recursively until extension fails.
* @param resource
* @param ttlMS
* @param extensionFailedCallback - gets called with the resource name when extension fails.
* @return
*/
private boolean acquireLockWithExtension(String resource, long ttlMS, Consumer<String> extensionFailedCallback) {
long acquireResult = acquireLock(resource, ttlMS);
if (acquireResult > 0) {
Expand All @@ -142,6 +190,13 @@ private boolean acquireLockWithExtension(String resource, long ttlMS, Consumer<S
return false;
}

/**
* This function is used to acquire / extend the lock on at least quorum number of hosts
* @param resource
* @param ttlMS
* @param extend
* @return
*/
private long runLockHost(String resource, long ttlMS, boolean extend) {
long startTime = Instant.now().toEpochMilli();
long drift = Math.round(ttlMS * CLOCK_DRIFT) + 2;
Expand All @@ -167,11 +222,22 @@ private long runLockHost(String resource, long ttlMS, boolean extend) {
return validity;
}

/**
* Tries to acquire lock on resource for ttlMS milliseconds. Returns the amount of time for which the lock was acquired
* @param resource
* @param ttlMS
* @return
*/
public long acquireLock(String resource, long ttlMS) {
resourceKeyMap.putIfAbsent(resource, getRandomString());
return runLockHost(resource, ttlMS, false);
}

/**
* Check if we still have the lock on a resource
* @param resource
* @return
*/
boolean checkResourceExists(String resource) {
if (!resourceKeyMap.containsKey(resource)) {
logger.info("No lock held on {}", resource);
Expand All @@ -190,18 +256,25 @@ private boolean awaitLatch(CountDownLatch latch, String resource) {
}
}

/**
* Check and get the ttls for the lock if it exists. This returns the minimum of ttls returned across the hosts
* @param resource
* @return
*/
public long checkLock(String resource) {
if (!checkResourceExists(resource)) {
return 0;
} else {
long startTime = Instant.now().toEpochMilli();
CopyOnWriteArrayList<Long> resultTtls = new CopyOnWriteArrayList<>();
CountDownLatch latch = new CountDownLatch(votingHostsSelector.getVotingSize());
CountDownLatch latch = new CountDownLatch(quorum);
votingHostsSelector.getVotingHosts().getEntireList().stream()
.map(host -> new CheckAndRunHost(host, pool, "pttl", resource, resourceKeyMap.get(resource)))
.forEach(checkAndRunHost -> CompletableFuture.supplyAsync(checkAndRunHost, service)
.thenAccept(r -> {
String result = r.getResult().toString();
// The lua script returns 0 if we have lost the lock or we get -2 if the ttl expired on
// the key when we checked for the pttl.
if (result.equals("0") || result.equals("-2")) {
logger.info("Lock not present on host");
} else {
Expand All @@ -222,6 +295,12 @@ public long checkLock(String resource) {
}
}

/**
* Try to extend lock by ttlMS
* @param resource
* @param ttlMS
* @return
*/
public long extendLock(String resource, long ttlMS) {
if (!checkResourceExists(resource)) {
logger.info("Could not extend lock since its already released");
Expand All @@ -231,10 +310,16 @@ public long extendLock(String resource, long ttlMS) {
}
}

/**
* Release all locks to clean.
*/
public void cleanup() {
resourceKeyMap.keySet().stream().forEach(this::releaseLock);
}

/**
* Log all the lock resources held right now.
*/
public void logLocks() {
resourceKeyMap.entrySet().stream()
.forEach(e -> logger.info("Resource: {}, Key: {}", e.getKey(), e.getValue()));
Expand Down Expand Up @@ -319,10 +404,10 @@ private DynoLockClient buildDynoLockClient() {
tokenMapSupplier = cpConfig.getTokenSupplier();
final ConnectionPool<Jedis> pool = DynoJedisUtils.createConnectionPool(appName, opMonitor, cpMonitor,
cpConfig, null);
VotingHostsFromTokenRange votingHostSelector = new VotingHostsFromTokenRange(hostSupplier, tokenMapSupplier,
cpConfig.getLockVotingSize());

return new DynoLockClient(appName, clusterName, pool, hostSupplier, tokenMapSupplier, cpConfig,
new VotingHostsFromTokenRange(hostSupplier, tokenMapSupplier, cpConfig.getLockVotingSize()),
timeout, timeoutUnit);
return new DynoLockClient(pool, votingHostSelector, timeout, timeoutUnit);
}
}
}
Loading

0 comments on commit 39f09af

Please sign in to comment.