diff --git a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java index 4eb65645..b5930a2a 100644 --- a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java +++ b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java @@ -73,7 +73,7 @@ public ArchaiusConnectionPoolConfiguration(String name) { configPublisherConfig = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".config.publisher.address", super.getConfigurationPublisherConfig()); failOnStartupIfNoHosts = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".config.startup.failIfNoHosts", super.getFailOnStartupIfNoHosts()); compressionThreshold = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.compressionThreshold", super.getValueCompressionThreshold()); - lockVotingSize = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.lock.votinSize", super.getLockVotingSize()); + lockVotingSize = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.lock.votingSize", super.getLockVotingSize()); loadBalanceStrategy = parseLBStrategy(propertyPrefix); errorRateConfig = parseErrorRateMonitorConfig(propertyPrefix); diff --git a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/consul/ConsulHostsSupplier.java b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/consul/ConsulHostsSupplier.java index 39034cc3..2f25e3e0 100644 --- a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/consul/ConsulHostsSupplier.java +++ b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/consul/ConsulHostsSupplier.java @@ -132,8 +132,10 @@ public Host apply(HealthService info) { Host host = new HostBuilder().setHostname(hostName) .setIpAddress(hostName) .setPort(info.getService().getPort()) - .setRack(rack).setDatacenter(String.valueOf(metaData.get("datacenter"))) - .setStatus(status).createHost(); + .setRack(rack) + .setDatacenter(String.valueOf(metaData.get("datacenter"))) + .setStatus(status) + .createHost(); return host; } })); diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/ConnectionPoolConfiguration.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/ConnectionPoolConfiguration.java index 12326941..6395a17d 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/ConnectionPoolConfiguration.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/ConnectionPoolConfiguration.java @@ -39,6 +39,10 @@ enum CompressionStrategy { THRESHOLD } + /** + * Should connections in this pool connect to the datastore directly? + * @return + */ boolean isConnectToDatastore(); boolean isFallbackEnabled(); diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java index ae12400c..474bdc1c 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java @@ -450,11 +450,6 @@ public ConnectionPoolConfigurationImpl withPoolReconnectWaitMillis(int millis) { return this; } - public ConnectionPoolConfigurationImpl withConnectToDatastore(boolean condition) { - connectToDatastore = condition; - return this; - } - public static class ErrorRateMonitorConfigImpl implements ErrorRateMonitorConfig { int window = 20; diff --git a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisUtils.java b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisUtils.java index a204c8a9..da04f21a 100644 --- a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisUtils.java +++ b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisUtils.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Stream; @Singleton public class DynoJedisUtils { @@ -117,7 +118,6 @@ private static void setHashtagConnectionPool(HostSupplier hostSupplier, Connecti // Find the hosts from host supplier List hosts = (List) hostSupplier.getHosts(); Collections.sort(hosts); - // Convert the arraylist to set // Take the token map supplier (aka the token topology from // Dynomite) @@ -139,26 +139,22 @@ private static void setHashtagConnectionPool(HostSupplier hostSupplier, Connecti } String hashtag = hostTokens.get(0).getHost().getHashtag(); - short numHosts = 0; - // Update inner state with the host tokens. - for (HostToken hToken : hostTokens) { + Stream htStream = hostTokens.stream().map(hostToken -> hostToken.getHost().getHashtag()); + + if (hashtag == null) { + htStream.filter(ht -> ht != null).findAny().ifPresent(ignore -> { + logger.error("Hashtag mismatch across hosts"); + throw new RuntimeException("Hashtags are different across hosts"); + }); + } else { /** * Checking hashtag consistency from all Dynomite hosts. If * hashtags are not consistent, we need to throw an exception. */ - String hashtagNew = hToken.getHost().getHashtag(); - - if (hashtag != null && !hashtag.equals(hashtagNew)) { + htStream.filter(ht -> !hashtag.equals(ht)).findAny().ifPresent(ignore -> { logger.error("Hashtag mismatch across hosts"); throw new RuntimeException("Hashtags are different across hosts"); - } // addressing case hashtag = null, hashtag = {} ... - else if (numHosts > 0 && hashtag == null && hashtagNew != null) { - logger.error("Hashtag mismatch across hosts"); - throw new RuntimeException("Hashtags are different across hosts"); - - } - hashtag = hashtagNew; - numHosts++; + }); } if (hashtag != null) { diff --git a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/operation/MultiKeyOperation.java b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/operation/MultiKeyOperation.java index b10e38c3..9a62fc57 100644 --- a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/operation/MultiKeyOperation.java +++ b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/operation/MultiKeyOperation.java @@ -45,6 +45,10 @@ public String getName() { return op.name(); } + /** + * Sends back only the first key of the multi key operation. + * @return + */ @Override public String getStringKey() { return (this.keys != null) ? this.keys.get(0) : null; diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/DynoLockClient.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/DynoLockClient.java index 0ea51a98..6927a775 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/DynoLockClient.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/DynoLockClient.java @@ -2,7 +2,6 @@ import com.netflix.discovery.EurekaClient; import com.netflix.dyno.connectionpool.ConnectionPool; -import com.netflix.dyno.connectionpool.ConnectionPoolConfiguration; import com.netflix.dyno.connectionpool.ConnectionPoolMonitor; import com.netflix.dyno.connectionpool.HostSupplier; import com.netflix.dyno.connectionpool.TokenMapSupplier; @@ -12,6 +11,9 @@ import com.netflix.dyno.contrib.DynoOPMonitor; import com.netflix.dyno.jedis.DynoJedisClient; import com.netflix.dyno.jedis.DynoJedisUtils; +import com.netflix.dyno.recipes.lock.command.CheckAndRunHost; +import com.netflix.dyno.recipes.lock.command.ExtendHost; +import com.netflix.dyno.recipes.lock.command.LockHost; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; @@ -32,32 +34,45 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +/** + * Client for acquiring locks similar to the redlock implementation https://redis.io/topics/distlock + * + * This locking mechanism does not give the guarantees for safety but can be used for efficiency. + * + * We assume some amount of clock skew between the client and server instances. Any major deviations in this + * will result in reduced accuracy of the lock. + * + * In the common locking case we rely on TTL's being set on a majority of redis servers in order to achieve the right + * locking characteristic. If we are unable to reach a majority of hosts when we try to acquire a lock or extend a lock. + * + * We try to release locks on all the hosts when we either shutdown or are unable to lock on a majority of hosts successfully + * + * These are the main edge cases where locking might not be mutually exclusive + * + * 1. An instance we acquired the lock on goes down and gets replaced by a new instance before the lock TTL expires. + * As suggested in the blog above, we need to ensure that new servers take longer than TTL time to come up so that + * any existing locks would've expired by then.(the client side code cannot control how quickly your servers come up). This can + * become a real issue if you're bringing up new servers in containers which can come up in a few seconds and you are holding locks for longer + * than the amount of time it takes for a new server to come up. + * 2. You have the JVM go into GC from when you acquired the lock to when you are going to modify the resource blocked by the lock. + * The client needs to ensure that GC is not happening for a long enough time that it can effect the assumption of lock being held (or have an alert on long GCs). + * + */ public class DynoLockClient { private static final Logger logger = LoggerFactory.getLogger(DynoJedisClient.class); - private final String appName; - private final String clusterName; - private final TokenMapSupplier tokenMapSupplier; - private final HostSupplier hostSupplier; - private final ConnectionPoolConfiguration cpConfig; private final ConnectionPool pool; private final VotingHostsSelector votingHostsSelector; private final ExecutorService service; private final int quorum; + // We assume a small amount of clock drift. private final double CLOCK_DRIFT = 0.01; private TimeUnit timeoutUnit; private long timeout; private final ConcurrentHashMap resourceKeyMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap resourceExecutorMap = new ConcurrentHashMap<>(); - - public DynoLockClient(String appName, String clusterName, ConnectionPool pool, HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier, - ConnectionPoolConfiguration cpConfig, VotingHostsSelector votingHostsSelector, long timeout, TimeUnit unit) { - this.appName = appName; - this.clusterName = clusterName; - this.tokenMapSupplier = tokenMapSupplier; - this.hostSupplier = hostSupplier; - this.cpConfig = cpConfig; + + public DynoLockClient(ConnectionPool pool, VotingHostsSelector votingHostsSelector, long timeout, TimeUnit unit) { this.pool = pool; this.votingHostsSelector = votingHostsSelector; // Threads for locking and unlocking @@ -65,6 +80,7 @@ public DynoLockClient(String appName, String clusterName, ConnectionPool pool, H this.quorum = votingHostsSelector.getVotingSize() / 2 + 1; this.timeout = timeout; this.timeoutUnit = unit; + // We want to release all locks in case of a graceful shutdown Runtime.getRuntime().addShutdownHook(new Thread(() -> cleanup())); } @@ -80,10 +96,18 @@ private static String getRandomString() { return UUID.randomUUID().toString(); } + /** + * Gets list of resources which are locked by the client + * @return + */ public List getLockedResources() { - return new ArrayList<>(resourceKeyMap.values()); + return new ArrayList<>(resourceKeyMap.keySet()); } + /** + * Release the lock (if held) on the resource + * @param resource + */ public void releaseLock(String resource) { if (!checkResourceExists(resource)) { logger.info("No lock held on {}", resource); @@ -109,6 +133,14 @@ public void releaseLock(String resource) { resourceKeyMap.remove(resource); } + /** + * Return a timer task which will recursively schedule itself when extension is successful. + * @param runJob + * @param resource + * @param ttlMS + * @param extensionFailed - This function gets called with the resource name when extension was unsuccessful. + * @return + */ private TimerTask getExtensionTask(Timer runJob, String resource, long ttlMS, Consumer extensionFailed) { return new TimerTask() { @Override @@ -125,6 +157,15 @@ public void run() { }; } + /** + * Try to acquire lock on resource for ttlMS and keep extending it by ttlMS when its about to expire. + * Calls the failure function with the resource when extension fails. + * @param resource The resource on which you want to acquire a lock + * @param ttlMS The amount of time for which we need to acquire the lock. We try to extend the lock every ttlMS / 2 + * @param failure This function will be called with the resource which could not be locked. This function is called + * even if the client released the lock. + * @return returns true if we were able to successfully acqurie the lock. + */ public boolean acquireLock(String resource, long ttlMS, Consumer failure) { return acquireLockWithExtension(resource, ttlMS, (r) -> { releaseLock(r); @@ -132,6 +173,13 @@ public boolean acquireLock(String resource, long ttlMS, Consumer failure }); } + /** + * Try to acquire the lock and schedule extension jobs recursively until extension fails. + * @param resource + * @param ttlMS + * @param extensionFailedCallback - gets called with the resource name when extension fails. + * @return + */ private boolean acquireLockWithExtension(String resource, long ttlMS, Consumer extensionFailedCallback) { long acquireResult = acquireLock(resource, ttlMS); if (acquireResult > 0) { @@ -142,6 +190,13 @@ private boolean acquireLockWithExtension(String resource, long ttlMS, Consumer resultTtls = new CopyOnWriteArrayList<>(); - CountDownLatch latch = new CountDownLatch(votingHostsSelector.getVotingSize()); + CountDownLatch latch = new CountDownLatch(quorum); votingHostsSelector.getVotingHosts().getEntireList().stream() .map(host -> new CheckAndRunHost(host, pool, "pttl", resource, resourceKeyMap.get(resource))) .forEach(checkAndRunHost -> CompletableFuture.supplyAsync(checkAndRunHost, service) .thenAccept(r -> { String result = r.getResult().toString(); + // The lua script returns 0 if we have lost the lock or we get -2 if the ttl expired on + // the key when we checked for the pttl. if (result.equals("0") || result.equals("-2")) { logger.info("Lock not present on host"); } else { @@ -222,6 +295,12 @@ public long checkLock(String resource) { } } + /** + * Try to extend lock by ttlMS + * @param resource + * @param ttlMS + * @return + */ public long extendLock(String resource, long ttlMS) { if (!checkResourceExists(resource)) { logger.info("Could not extend lock since its already released"); @@ -231,10 +310,16 @@ public long extendLock(String resource, long ttlMS) { } } + /** + * Release all locks to clean. + */ public void cleanup() { resourceKeyMap.keySet().stream().forEach(this::releaseLock); } + /** + * Log all the lock resources held right now. + */ public void logLocks() { resourceKeyMap.entrySet().stream() .forEach(e -> logger.info("Resource: {}, Key: {}", e.getKey(), e.getValue())); @@ -319,10 +404,10 @@ private DynoLockClient buildDynoLockClient() { tokenMapSupplier = cpConfig.getTokenSupplier(); final ConnectionPool pool = DynoJedisUtils.createConnectionPool(appName, opMonitor, cpMonitor, cpConfig, null); + VotingHostsFromTokenRange votingHostSelector = new VotingHostsFromTokenRange(hostSupplier, tokenMapSupplier, + cpConfig.getLockVotingSize()); - return new DynoLockClient(appName, clusterName, pool, hostSupplier, tokenMapSupplier, cpConfig, - new VotingHostsFromTokenRange(hostSupplier, tokenMapSupplier, cpConfig.getLockVotingSize()), - timeout, timeoutUnit); + return new DynoLockClient(pool, votingHostSelector, timeout, timeoutUnit); } } } diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRange.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRange.java index dcf81e32..9887ca17 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRange.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRange.java @@ -16,12 +16,15 @@ import java.util.function.Function; import java.util.stream.Collectors; +/** + * This class deterministically returns a list of hosts which will be used for voting. We use the TokenRange to get the + * same set of hosts from all clients. + */ public class VotingHostsFromTokenRange implements VotingHostsSelector { private final TokenMapSupplier tokenMapSupplier; private final HostSupplier hostSupplier; private final CircularList votingHosts = new CircularList<>(new ArrayList<>()); - // TODO: raghu Might be easier as a FP? private final int MIN_VOTING_SIZE = 1; private final int MAX_VOTING_SIZE = 5; private final int effectiveVotingSize; @@ -37,19 +40,27 @@ public VotingHostsFromTokenRange(HostSupplier hostSupplier, TokenMapSupplier tok @Override public CircularList getVotingHosts() { if (votingHosts.getSize() == 0) { + if(effectiveVotingSize % 2 == 0) { + throw new IllegalStateException("Cannot do voting with even number of nodes for voting"); + } List allHostTokens = tokenMapSupplier.getTokens(ImmutableSet.copyOf(hostSupplier.getHosts())); if (allHostTokens.size() < MIN_VOTING_SIZE) { throw new IllegalStateException(String.format("Cannot perform voting with less than %d nodes", MIN_VOTING_SIZE)); } + // Total number of hosts present per rack Map numHostsPerRack = allHostTokens.stream().map(ht -> ht.getHost().getRack()).collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); AtomicInteger numHostsRequired = new AtomicInteger(effectiveVotingSize); + // Map to keep track of number of hosts to take for voting from this rack Map numHosts = new HashMap<>(); + // Sort racks to get the same order List racks = numHostsPerRack.keySet().stream().sorted(Comparator.comparing(String::toString)).collect(Collectors.toList()); for(String rack: racks) { - if(numHostsRequired.get() <= 0) { - numHosts.put(rack, 0); - continue; - } +// // Already have required number of hosts for voting. Do not take any more. +// if(numHostsRequired.get() <= 0) { +// numHosts.put(rack, 0); +// continue; +// } + // Need more hosts take as as many as you can from this rack. int v = (int) Math.min(numHostsRequired.get(), numHostsPerRack.get(rack)); numHostsRequired.addAndGet(-v); numHosts.put(rack, v); @@ -57,7 +68,7 @@ public CircularList getVotingHosts() { } Map> rackToHostToken = allHostTokens.stream() .collect(Collectors.groupingBy(ht -> ht.getHost().getRack())); - allHostTokens.sort(HostToken::compareTo); + // Get the final list of voting hosts List finalVotingHosts = rackToHostToken.entrySet().stream() .sorted(Comparator.comparing(Map.Entry::getKey)) .flatMap(e -> { diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsSelector.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsSelector.java index e2254a12..a2e8b8c4 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsSelector.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/VotingHostsSelector.java @@ -4,7 +4,15 @@ import com.netflix.dyno.connectionpool.impl.lb.CircularList; public interface VotingHostsSelector { + /** + * Get the list of hosts eligible for voting + * @return + */ CircularList getVotingHosts(); + /** + * Returns the number of voting hosts + * @return + */ int getVotingSize(); } diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/CheckAndRunHost.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/CheckAndRunHost.java similarity index 83% rename from dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/CheckAndRunHost.java rename to dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/CheckAndRunHost.java index ecbf39dd..5404b655 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/CheckAndRunHost.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/CheckAndRunHost.java @@ -1,4 +1,4 @@ -package com.netflix.dyno.recipes.lock; +package com.netflix.dyno.recipes.lock.command; import com.netflix.dyno.connectionpool.Connection; import com.netflix.dyno.connectionpool.ConnectionContext; @@ -9,6 +9,9 @@ import com.netflix.dyno.jedis.operation.BaseKeyOperation; import redis.clients.jedis.Jedis; +/** + * Runs a command against the host and is used to remove the lock and checking the ttl on the resource + */ public class CheckAndRunHost extends CommandHost { private static final String cmdScript = " if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + @@ -37,6 +40,9 @@ public OperationResult get() { OperationResult result = connection.execute(new BaseKeyOperation(randomKey, OpName.EVAL) { @Override public Object execute(Jedis client, ConnectionContext state) { + if (randomKey == null) { + throw new IllegalStateException("Cannot extend lock with null value for key"); + } Object result = client.eval(command, 1, resource, randomKey); return result; } diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/CommandHost.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/CommandHost.java similarity index 82% rename from dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/CommandHost.java rename to dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/CommandHost.java index 8f8df349..548427e8 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/CommandHost.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/CommandHost.java @@ -1,4 +1,4 @@ -package com.netflix.dyno.recipes.lock; +package com.netflix.dyno.recipes.lock.command; import com.netflix.dyno.connectionpool.Connection; import com.netflix.dyno.connectionpool.ConnectionPool; @@ -9,10 +9,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +/** + * This class is used to handle the host connection startup and cleanup. + * All non abstract subclasses should implement the supplier operation. + * @param + */ public abstract class CommandHost implements Supplier> { private final Host host; private final ConnectionPool pool; - private Connection connection; public CommandHost(Host host, ConnectionPool pool) { this.host = host; diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/ExtendHost.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/ExtendHost.java similarity index 88% rename from dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/ExtendHost.java rename to dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/ExtendHost.java index a34b4389..4a238bb7 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/ExtendHost.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/ExtendHost.java @@ -1,4 +1,4 @@ -package com.netflix.dyno.recipes.lock; +package com.netflix.dyno.recipes.lock.command; import com.netflix.dyno.connectionpool.Connection; import com.netflix.dyno.connectionpool.ConnectionContext; @@ -7,16 +7,17 @@ import com.netflix.dyno.connectionpool.OperationResult; import com.netflix.dyno.jedis.OpName; import com.netflix.dyno.jedis.operation.BaseKeyOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.netflix.dyno.recipes.lock.LockResource; import redis.clients.jedis.Jedis; import redis.clients.jedis.params.SetParams; import java.util.concurrent.CountDownLatch; +/** + * Instances of this class should be used to perform extend operations on an acquired lock. + */ public class ExtendHost extends CommandHost { - private static final Logger logger = LoggerFactory.getLogger(ExtendHost.class); private static final String cmdScript = " if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"set\",KEYS[1], ARGV[1], \"px\", ARGV[2])" + " else\n" + @@ -43,6 +44,7 @@ public OperationResult get() { OperationResult result = connection.execute(new BaseKeyOperation(randomKey, OpName.EVAL) { @Override public LockResource execute(Jedis client, ConnectionContext state) { + // We need to recheck randomKey in case it got removed before we get here. if (randomKey == null) { throw new IllegalStateException("Cannot extend lock with null value for key"); } diff --git a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/LockHost.java b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/LockHost.java similarity index 91% rename from dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/LockHost.java rename to dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/LockHost.java index 12ae7aa8..f0ebec12 100644 --- a/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/LockHost.java +++ b/dyno-recipes/src/main/java/com/netflix/dyno/recipes/lock/command/LockHost.java @@ -1,4 +1,4 @@ -package com.netflix.dyno.recipes.lock; +package com.netflix.dyno.recipes.lock.command; import com.netflix.dyno.connectionpool.Connection; import com.netflix.dyno.connectionpool.ConnectionContext; @@ -7,11 +7,15 @@ import com.netflix.dyno.connectionpool.OperationResult; import com.netflix.dyno.jedis.OpName; import com.netflix.dyno.jedis.operation.BaseKeyOperation; +import com.netflix.dyno.recipes.lock.LockResource; import redis.clients.jedis.Jedis; import redis.clients.jedis.params.SetParams; import java.util.concurrent.CountDownLatch; +/** + * This class is used to acquire the lock on a host with a resource. + */ public class LockHost extends CommandHost { private final String value; diff --git a/dyno-recipes/src/test/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRangeTest.java b/dyno-recipes/src/test/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRangeTest.java index a4a8afa4..8def4db1 100644 --- a/dyno-recipes/src/test/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRangeTest.java +++ b/dyno-recipes/src/test/java/com/netflix/dyno/recipes/lock/VotingHostsFromTokenRangeTest.java @@ -1,16 +1,78 @@ package com.netflix.dyno.recipes.lock; +import com.netflix.dyno.connectionpool.Host; +import com.netflix.dyno.connectionpool.HostBuilder; +import com.netflix.dyno.connectionpool.HostSupplier; +import com.netflix.dyno.connectionpool.TokenMapSupplier; +import com.netflix.dyno.connectionpool.impl.lb.CircularList; +import com.netflix.dyno.connectionpool.impl.lb.HostToken; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class VotingHostsFromTokenRangeTest { - @Test - public void getVotingHosts() { + private String r1 = "rack1"; + private String r2 = "rack2"; + private String r3 = "rack3"; + private TokenMapSupplier tokenMapSupplier; + private HostSupplier hostSupplier; + private VotingHostsSelector votingHostsSelector; + private List hosts; + + @Before + public void setUp() { + Host h1 = new HostBuilder().setHostname("h1").setRack(r1).setStatus(Host.Status.Up).createHost(); + Host h2 = new HostBuilder().setHostname("h2").setRack(r1).setStatus(Host.Status.Up).createHost(); + Host h3 = new HostBuilder().setHostname("h3").setRack(r2).setStatus(Host.Status.Up).createHost(); + Host h4 = new HostBuilder().setHostname("h4").setRack(r2).setStatus(Host.Status.Up).createHost(); + Host h5 = new HostBuilder().setHostname("h5").setRack(r2).setStatus(Host.Status.Up).createHost(); + Host h6 = new HostBuilder().setHostname("h6").setRack(r3).setStatus(Host.Status.Up).createHost(); + + Host[] arr = {h1, h2, h3, h4, h5, h6}; + hosts = Arrays.asList(arr); + final Map tokenMap = new HashMap<>(); + + tokenMap.put(h1, new HostToken(1111L, h1)); + tokenMap.put(h2, new HostToken(2222L, h2)); + tokenMap.put(h3, new HostToken(1111L, h3)); + tokenMap.put(h4, new HostToken(2222L, h4)); + tokenMap.put(h5, new HostToken(3333L, h5)); + tokenMap.put(h6, new HostToken(1111L, h6)); + hostSupplier = () -> hosts; + tokenMapSupplier = new TokenMapSupplier() { + @Override + public List getTokens(Set activeHosts) { + return new ArrayList<>(tokenMap.values()); + } + + @Override + public HostToken getTokenForHost(Host host, Set activeHosts) { + return tokenMap.get(host); + } + }; + } + + private void testVotingSize(int votingSize) { + votingHostsSelector = new VotingHostsFromTokenRange(hostSupplier, tokenMapSupplier, votingSize); + CircularList hosts = votingHostsSelector.getVotingHosts(); + Set resultHosts = hosts.getEntireList().stream().map(h -> h.getHostName()).collect(Collectors.toSet()); + Assert.assertEquals(votingSize, resultHosts.size()); + Assert.assertEquals(votingSize, + hosts.getEntireList().subList(0, votingSize).stream().filter(h1 -> resultHosts.contains(h1.getHostName())).count()); } @Test public void getVotingSize() { + IntStream.range(1, 6).filter(i -> i%2 != 0).forEach(i -> testVotingSize(i)); } } \ No newline at end of file