Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of Redlock #271

Merged
merged 1 commit into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ subprojects {
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'jacoco'
sourceCompatibility = 1.8

repositories {
Expand All @@ -38,6 +39,16 @@ subprojects {
options.addStringOption('Xdoclint:none', '-quiet')
}
}

jacocoTestReport {
reports {
xml.enabled false
csv.enabled false
html.enabled true
}
}

test.finalizedBy(project.tasks.jacocoTestReport)
}

project(':dyno-core') {
Expand Down Expand Up @@ -135,5 +146,10 @@ project(':dyno-recipes') {
compile project(':dyno-core')
compile project(':dyno-jedis')
compile 'com.google.code.gson:gson:2.8.5'
testCompile 'com.netflix.spinnaker.embedded-redis:embedded-redis:0.8.0'
}

test {
testLogging.showStandardStreams = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigura
private final ErrorRateMonitorConfig errorRateConfig;
private final RetryPolicyFactory retryPolicyFactory;
private final DynamicBooleanProperty failOnStartupIfNoHosts;
private final DynamicIntProperty lockVotingSize;

private DynamicBooleanProperty isDualWriteEnabled;
private DynamicStringProperty dualWriteClusterName;
Expand All @@ -72,6 +73,7 @@ public ArchaiusConnectionPoolConfiguration(String name) {
configPublisherConfig = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".config.publisher.address", super.getConfigurationPublisherConfig());
failOnStartupIfNoHosts = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".config.startup.failIfNoHosts", super.getFailOnStartupIfNoHosts());
compressionThreshold = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.compressionThreshold", super.getValueCompressionThreshold());
lockVotingSize = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".config.lock.votingSize", super.getLockVotingSize());

loadBalanceStrategy = parseLBStrategy(propertyPrefix);
errorRateConfig = parseErrorRateMonitorConfig(propertyPrefix);
Expand Down Expand Up @@ -171,6 +173,11 @@ public int getDualWritePercentage() {
return dualWritePercentage.get();
}

@Override
public int getLockVotingSize() {
return lockVotingSize.get();
}

public void setIsDualWriteEnabled(DynamicBooleanProperty booleanProperty) {
this.isDualWriteEnabled = booleanProperty;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;

import com.netflix.dyno.connectionpool.HostBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,7 +115,7 @@ public Host apply(InstanceInfo info) {
if (rack == null) {
Logger.error("Rack wasn't found for host:" + info.getHostName() + " there may be issues matching it up to the token map");
}
Host host = new Host(info.getHostName(), info.getIPAddr(), rack, status);
Host host = new HostBuilder().setHostname(info.getHostName()).setIpAddress(info.getIPAddr()).setRack(rack).setStatus(status).createHost();
return host;
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
*/
package com.netflix.dyno.contrib.consul;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
Expand All @@ -34,7 +26,15 @@
import com.google.common.collect.Lists;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.connectionpool.HostBuilder;
import com.netflix.dyno.connectionpool.HostSupplier;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Simple class that implements {@link Supplier}<{@link List}<{@link Host}>>. It provides a List<{@link Host}>
Expand Down Expand Up @@ -129,8 +129,13 @@ public Host apply(HealthService info) {
Logger.error("Rack wasn't found for host:" + info.getNode()
+ " there may be issues matching it up to the token map");
}
Host host = new Host(hostName, hostName, info.getService().getPort(), rack,
String.valueOf(metaData.get("datacenter")), status);
Host host = new HostBuilder().setHostname(hostName)
.setIpAddress(hostName)
.setPort(info.getService().getPort())
.setRack(rack)
.setDatacenter(String.valueOf(metaData.get("datacenter")))
.setStatus(status)
.createHost();
return host;
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ public interface ConnectionFactory<CL> {
* Create a connection for this {@link HostConnectionPool}
*
* @param pool
* @param observor
* @return
* @throws DynoConnectException
* @throws ThrottledException
*/
public Connection<CL> createConnection(HostConnectionPool<CL> pool, ConnectionObservor observor) throws DynoConnectException, ThrottledException;
Connection<CL> createConnection(HostConnectionPool<CL> pool) throws DynoConnectException;

Connection<CL> createConnectionWithDataStore(HostConnectionPool<CL> pool)
throws DynoConnectException;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ enum CompressionStrategy {
THRESHOLD
}

/**
* Should connections in this pool connect to the datastore directly?
* @return
*/
boolean isConnectToDatastore();
rsrinivasanNetflix marked this conversation as resolved.
Show resolved Hide resolved

boolean isFallbackEnabled();

/**
* Returns the voting size for dyno lock
* @return
*/
int getLockVotingSize();

/**
* Returns the unique name assigned to this connection pool.
*/
Expand Down Expand Up @@ -224,4 +238,5 @@ enum CompressionStrategy {

String getHashtag();

ConnectionPoolConfiguration setLocalZoneAffinity(boolean condition);
}
81 changes: 29 additions & 52 deletions dyno-core/src/main/java/com/netflix/dyno/connectionpool/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
******************************************************************************/
package com.netflix.dyno.connectionpool;

import org.apache.commons.lang3.StringUtils;

import java.net.InetSocketAddress;
import java.util.Objects;

import com.netflix.dyno.connectionpool.impl.utils.ConfigUtils;
import org.apache.commons.lang3.StringUtils;

/**
* Class encapsulating information about a host.
* <p>
Expand All @@ -35,12 +34,15 @@
public class Host implements Comparable<Host> {

public static final int DEFAULT_PORT = 8102;
public static final Host NO_HOST = new Host("UNKNOWN", "UNKNOWN", 0, "UNKNOWN");
public static final int DEFAULT_DATASTORE_PORT = 22122;
public static final Host NO_HOST = new HostBuilder().setHostname("UNKNOWN").setIpAddress("UNKNOWN").setPort(0)
.setRack("UNKNOWN").createHost();

private final String hostname;
private final String ipAddress;
private final int port;
private final int securePort;
private final int datastorePort;
private final InetSocketAddress socketAddress;
private final String rack;
private final String datacenter;
Expand All @@ -52,56 +54,12 @@ public enum Status {
Up, Down;
}

public Host(String hostname, int port, String rack) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), Status.Down, null);
}

public Host(String hostname, String rack, Status status) {
this(hostname, null, DEFAULT_PORT, DEFAULT_PORT, rack, ConfigUtils.getDataCenterFromRack(rack), status, null);
}

public Host(String hostname, int port, String rack, Status status) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), status, null);
}

public Host(String hostname, int port, String rack, Status status, String hashtag) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), status, hashtag);
}

public Host(String hostname, int port, String rack, Status status, String hashtag, String password) {
this(hostname, null, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), status, hashtag, password);
}

public Host(String hostname, String ipAddress, int port, String rack) {
this(hostname, ipAddress, port, port, rack, ConfigUtils.getDataCenterFromRack(rack), Status.Down, null);
}

public Host(String hostname, String ipAddress, String rack, Status status) {
this(hostname, ipAddress, DEFAULT_PORT, DEFAULT_PORT, rack, ConfigUtils.getDataCenterFromRack(rack), status, null);
}

public Host(String hostname, String ipAddress, String rack, Status status, String hashtag) {
this(hostname, ipAddress, DEFAULT_PORT, DEFAULT_PORT, rack, ConfigUtils.getDataCenterFromRack(rack), status, hashtag);
}

public Host(String hostname, String ipAddress, int port, String rack, String datacenter, Status status) {
this(hostname, ipAddress, port, port, rack, datacenter, status, null);
}

public Host(String name, String ipAddress, int port, String rack, String datacenter, Status status,
String hashtag) {
this(name, ipAddress, port, port, rack, datacenter, status, hashtag);
}

public Host(String name, String ipAddress, int port, int securePort, String rack, String datacenter, Status status, String hashtag) {
this(name, ipAddress, port, port, rack, datacenter, status, hashtag, null);
}

public Host(String name, String ipAddress, int port, int securePort, String rack, String datacenter, Status status, String hashtag, String password) {
this.hostname = name;
public Host(String hostname, String ipAddress, int port, int securePort, int datastorePort, String rack, String datacenter, Status status, String hashtag, String password) {
this.hostname = hostname;
this.ipAddress = ipAddress;
this.port = port;
this.securePort = securePort;
this.datastorePort = datastorePort;
this.rack = rack;
this.status = status;
this.datacenter = datacenter;
Expand All @@ -110,7 +68,7 @@ public Host(String name, String ipAddress, int port, int securePort, String rack

// Used for the unit tests to prevent host name resolution
if (port != -1) {
this.socketAddress = new InetSocketAddress(name, port);
this.socketAddress = new InetSocketAddress(hostname, port);
} else {
this.socketAddress = null;
}
Expand Down Expand Up @@ -139,6 +97,10 @@ public int getSecurePort() {
return securePort;
}

public int getDatastorePort() {
return datastorePort;
}

public String getDatacenter() {
return datacenter;
}
Expand Down Expand Up @@ -168,6 +130,10 @@ public String getPassword() {
return password;
}

public Status getStatus() {
return status;
}

public InetSocketAddress getSocketAddress() {
return socketAddress;
}
Expand Down Expand Up @@ -223,4 +189,15 @@ public String toString() {
+ rack + ", datacenter: " + datacenter + ", status: " + status.name() + ", hashtag="
+ hashtag + ", password=" + (Objects.nonNull(password) ? "masked" : "null") + "]";
}

public static Host clone(Host host) {
return new HostBuilder().setHostname(host.getHostName())
.setIpAddress(host.getIpAddress()).setPort(host.getPort())
.setSecurePort(host.getSecurePort())
.setRack(host.getRack())
.setDatastorePort(host.getDatastorePort())
.setDatacenter(host.getDatacenter()).setStatus(host.getStatus())
.setHashtag(host.getHashtag())
.setPassword(host.getPassword()).createHost();
}
}
Loading