Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP consistent hash routing #496

Merged
merged 7 commits into from
Feb 23, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ class VirtualHost {
virtual const RateLimitPolicy& rateLimitPolicy() const PURE;
};

/**
* Route hash policy. I.e., if using a hashing load balancer, how the route should be hashed onto
* an upstream host.
*/
class HashPolicy {
public:
virtual ~HashPolicy() {}

/**
* @return Optional<uint64_t> an optional hash value to route on given a set of HTTP headers.
* A hash value might not be returned if for example the specified HTTP header does not
* exist. In the future we might add addtional support for hashing on origin address, etc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional

*/
virtual Optional<uint64_t> generateHash(const Http::HeaderMap& headers) const PURE;
};

/**
* An individual resolved route entry.
*/
Expand All @@ -160,6 +176,11 @@ class RouteEntry {
*/
virtual void finalizeRequestHeaders(Http::HeaderMap& headers) const PURE;

/**
* @return const HashPolicy* the optional hash policy for the route.
*/
virtual const HashPolicy* hashPolicy() const PURE;

/**
* @return the priority of the route.
*/
Expand Down
4 changes: 3 additions & 1 deletion include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/http/async_client.h"
#include "envoy/http/conn_pool.h"
#include "envoy/json/json_object.h"
#include "envoy/upstream/load_balancer.h"
#include "envoy/upstream/upstream.h"

namespace Upstream {
Expand Down Expand Up @@ -53,7 +54,8 @@ class ClusterManager {
* exist.
*/
virtual Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster,
ResourcePriority priority) PURE;
ResourcePriority priority,
LoadBalancerContext* context) PURE;

/**
* Allocate a load balanced TCP connection for a cluster. The created connection is already
Expand Down
19 changes: 18 additions & 1 deletion include/envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@

namespace Upstream {

/**
* Context passed to a load balancer to use when choosing a host. Not all load balancers make use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: of all or any context information

* of all context information.
*/
class LoadBalancerContext {
public:
virtual ~LoadBalancerContext() {}

/**
* @return const Optional<uint64_t>& the optional hash key to use during load balancing.
*/
virtual const Optional<uint64_t>& hashKey() const PURE;
};

/**
* Abstract load balancing interface.
*/
Expand All @@ -14,8 +28,11 @@ class LoadBalancer {

/**
* Ask the load balancer for the next host to use depending on the underlying LB algorithm.
* @param context supplies the load balancer context. Not all load balancers make use of all
* context information. Load balancers should be written to assume that context information
* is missing and use sensible defaults.
*/
virtual ConstHostPtr chooseHost() PURE;
virtual ConstHostPtr chooseHost(const LoadBalancerContext* context) PURE;
};

typedef std::unique_ptr<LoadBalancer> LoadBalancerPtr;
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/upstream/load_balancer_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Upstream {
/**
* Type of load balancing to perform.
*/
enum class LoadBalancerType { RoundRobin, LeastRequest, Random };
enum class LoadBalancerType { RoundRobin, LeastRequest, Random, RingHash };

} // Upstream
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ add_library(
upstream/load_balancer_impl.cc
upstream/logical_dns_cluster.cc
upstream/outlier_detection_impl.cc
upstream/ring_hash_lb.cc
upstream/sds.cc
upstream/upstream_impl.cc
${gen_git_sha_target})
Expand Down
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
// Router::RouteEntry
const std::string& clusterName() const override { return cluster_name_; }
void finalizeRequestHeaders(Http::HeaderMap&) const override {}
const Router::HashPolicy* hashPolicy() const override { return nullptr; }
Upstream::ResourcePriority priority() const override {
return Upstream::ResourcePriority::Default;
}
Expand Down
12 changes: 10 additions & 2 deletions source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,15 @@ const std::string Json::Schema::ROUTE_ENTRY_CONFIGURATION_SCHEMA(R"EOF(
"additionalProperties" : false
}
},
"rate_limits" : {"type" : "array"}
"rate_limits" : {"type" : "array"},
"hash_policy" : {
"type" : "object",
"properties" : {
"header_name" : {"type" : "string"}
},
"required" : ["header_name"],
"additionalProperties" : false
}
},
"additionalProperties" : false
}
Expand Down Expand Up @@ -983,7 +991,7 @@ const std::string Json::Schema::CLUSTER_SCHEMA(R"EOF(
},
"lb_type" : {
"type" : "string",
"enum" : ["round_robin", "least_request", "random"]
"enum" : ["round_robin", "least_request", "random", "ring_hash"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be exposed as "ketama" (or at the very least "consistent_hash"), otherwise it's not obvious what the underlying mapping represents.

Also, we might want to add other consistent hashes in the future (like the less memory-hungry "jump" algorithm).

},
"hosts" : {
"type" : "array",
Expand Down
18 changes: 18 additions & 0 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ bool ConfigUtility::matchHeaders(const Http::HeaderMap& request_headers,
return matches;
}

HashPolicyImpl::HashPolicyImpl(const Json::Object& config)
: header_name_(config.getString("header_name")) {}

Optional<uint64_t> HashPolicyImpl::generateHash(const Http::HeaderMap& headers) const {
Optional<uint64_t> hash;
const Http::HeaderEntry* header = headers.get(header_name_);
if (header) {
// TODO: Compile in murmur3/city/etc. and potentially allow the user to choose so we know
// exactly what we are going to get.
hash.value(std::hash<std::string>()(header->value().c_str()));
}
return hash;
}

const uint64_t RouteEntryImplBase::WeightedClusterEntry::MAX_CLUSTER_WEIGHT = 100UL;

RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost, const Json::Object& route,
Expand Down Expand Up @@ -151,6 +165,10 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost, const Json:
header_map->getBoolean("regex", false));
}
}

if (route.hasObject("hash_policy")) {
hash_policy_.reset(new HashPolicyImpl(*route.getObject("hash_policy")));
}
}

bool RouteEntryImplBase::matchRoute(const Http::HeaderMap& headers, uint64_t random_value) const {
Expand Down
20 changes: 18 additions & 2 deletions source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/json/json_loader.h"

namespace Router {

/**
Expand Down Expand Up @@ -168,6 +166,21 @@ class ShadowPolicyImpl : public ShadowPolicy {
std::string runtime_key_;
};

/**
* Implementation of HashPolicy that reads from the JSON route config and only currently supports
* hashing on an HTTP header.
*/
class HashPolicyImpl : public HashPolicy {
public:
HashPolicyImpl(const Json::Object& config);

// Router::HashPolicy
Optional<uint64_t> generateHash(const Http::HeaderMap& headers) const override;

private:
const Http::LowerCaseString header_name_;
};

/**
* Base implementation for all route entries.
*/
Expand All @@ -189,6 +202,7 @@ class RouteEntryImplBase : public RouteEntry,
// Router::RouteEntry
const std::string& clusterName() const override;
void finalizeRequestHeaders(Http::HeaderMap& headers) const override;
const HashPolicy* hashPolicy() const override { return hash_policy_.get(); }
Upstream::ResourcePriority priority() const override { return priority_; }
const RateLimitPolicy& rateLimitPolicy() const override { return rate_limit_policy_; }
const RetryPolicy& retryPolicy() const override { return retry_policy_; }
Expand Down Expand Up @@ -232,6 +246,7 @@ class RouteEntryImplBase : public RouteEntry,
return parent_->finalizeRequestHeaders(headers);
}

const HashPolicy* hashPolicy() const override { return parent_->hashPolicy(); }
Upstream::ResourcePriority priority() const override { return parent_->priority(); }
const RateLimitPolicy& rateLimitPolicy() const override { return parent_->rateLimitPolicy(); }
const RetryPolicy& retryPolicy() const override { return parent_->retryPolicy(); }
Expand Down Expand Up @@ -299,6 +314,7 @@ class RouteEntryImplBase : public RouteEntry,
const Upstream::ResourcePriority priority_;
std::vector<ConfigUtility::HeaderData> config_headers_;
std::vector<WeightedClusterEntryPtr> weighted_clusters_;
std::unique_ptr<const HashPolicyImpl> hash_policy_;
};

/**
Expand Down
19 changes: 15 additions & 4 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,16 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e
return Http::FilterHeadersStatus::StopIteration;
}

// See if we need to set up for hashing.
if (route_entry_->hashPolicy()) {
Optional<uint64_t> hash = route_entry_->hashPolicy()->generateHash(headers);
if (hash.valid()) {
lb_context_.reset(new LoadBalancerContextImpl(hash));
}
}

// Fetch a connection pool for the upstream cluster.
Http::ConnectionPool::Instance* conn_pool =
config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), finalPriority());
Http::ConnectionPool::Instance* conn_pool = getConnPool();
if (!conn_pool) {
sendNoHealthyUpstreamResponse();
return Http::FilterHeadersStatus::StopIteration;
Expand Down Expand Up @@ -237,6 +244,11 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e
return Http::FilterHeadersStatus::StopIteration;
}

Http::ConnectionPool::Instance* Filter::getConnPool() {
return config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), finalPriority(),
lb_context_.get());
}

void Filter::sendNoHealthyUpstreamResponse() {
callbacks_->requestInfo().setResponseFlag(Http::AccessLog::ResponseFlag::NoHealthyUpstream);
chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr);
Expand Down Expand Up @@ -535,8 +547,7 @@ bool Filter::setupRetry(bool end_stream) {
}

void Filter::doRetry() {
Http::ConnectionPool::Instance* conn_pool =
config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), finalPriority());
Http::ConnectionPool::Instance* conn_pool = getConnPool();
if (!conn_pool) {
sendNoHealthyUpstreamResponse();
cleanup();
Expand Down
11 changes: 11 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ class Filter : Logger::Loggable<Logger::Id::router>, public Http::StreamDecoderF

typedef std::unique_ptr<UpstreamRequest> UpstreamRequestPtr;

struct LoadBalancerContextImpl : public Upstream::LoadBalancerContext {
LoadBalancerContextImpl(const Optional<uint64_t>& hash) : hash_(hash) {}

// Upstream::LoadBalancerContext
const Optional<uint64_t>& hashKey() const override { return hash_; }

Optional<uint64_t> hash_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

};

enum class UpstreamResetType { Reset, GlobalTimeout, PerTryTimeout };

Http::AccessLog::ResponseFlag
Expand All @@ -185,6 +194,7 @@ class Filter : Logger::Loggable<Logger::Id::router>, public Http::StreamDecoderF
Event::Dispatcher& dispatcher,
Upstream::ResourcePriority priority) PURE;
Upstream::ResourcePriority finalPriority();
Http::ConnectionPool::Instance* getConnPool();
void maybeDoShadowing();
void onRequestComplete();
void onResetStream();
Expand Down Expand Up @@ -214,6 +224,7 @@ class Filter : Logger::Loggable<Logger::Id::router>, public Http::StreamDecoderF
Http::HeaderMap* downstream_headers_{};
Http::HeaderMap* downstream_trailers_{};
SystemTime downstream_request_complete_time_;
std::unique_ptr<LoadBalancerContextImpl> lb_context_;

bool downstream_response_started_ : 1;
bool downstream_end_stream_ : 1;
Expand Down
17 changes: 12 additions & 5 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "cds_api_impl.h"
#include "cluster_manager_impl.h"
#include "load_balancer_impl.h"
#include "ring_hash_lb.h"

#include "envoy/event/dispatcher.h"
#include "envoy/network/dns.h"
Expand Down Expand Up @@ -302,7 +303,8 @@ ClusterInfoPtr ClusterManagerImpl::get(const std::string& cluster) {
}

Http::ConnectionPool::Instance*
ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourcePriority priority) {
ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourcePriority priority,
LoadBalancerContext* context) {
ThreadLocalClusterManagerImpl& cluster_manager =
tls_.getTyped<ThreadLocalClusterManagerImpl>(thread_local_slot_);

Expand All @@ -312,7 +314,7 @@ ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourceP
return nullptr;
}

return entry->second->connPool(priority);
return entry->second->connPool(priority, context);
}

void ClusterManagerImpl::postThreadLocalClusterUpdate(const Cluster& primary_cluster,
Expand Down Expand Up @@ -343,7 +345,7 @@ Host::CreateConnectionData ClusterManagerImpl::tcpConnForCluster(const std::stri
throw EnvoyException(fmt::format("unknown cluster '{}'", cluster));
}

ConstHostPtr logical_host = entry->second->lb_->chooseHost();
ConstHostPtr logical_host = entry->second->lb_->chooseHost(nullptr);
if (logical_host) {
return logical_host->createConnection(cluster_manager.thread_local_dispatcher_);
} else {
Expand Down Expand Up @@ -476,6 +478,11 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
parent.parent_.runtime_, parent.parent_.random_));
break;
}
case LoadBalancerType::RingHash: {
lb_.reset(new RingHashLoadBalancer(host_set_, cluster->stats(), parent.parent_.runtime_,
parent.parent_.random_));
break;
}
}

host_set_.addMemberUpdateCb(
Expand All @@ -498,8 +505,8 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry()

Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
ResourcePriority priority) {
ConstHostPtr host = lb_->chooseHost();
ResourcePriority priority, LoadBalancerContext* context) {
ConstHostPtr host = lb_->chooseHost(context);
if (!host) {
cluster_info_->stats().upstream_cx_none_healthy_.inc();
return nullptr;
Expand Down
6 changes: 4 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class ClusterManagerImpl : public ClusterManager {
}
ClusterInfoPtr get(const std::string& cluster) override;
Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster,
ResourcePriority priority) override;
ResourcePriority priority,
LoadBalancerContext* context) override;
Host::CreateConnectionData tcpConnForCluster(const std::string& cluster) override;
Http::AsyncClient& httpAsyncClientForCluster(const std::string& cluster) override;
bool removePrimaryCluster(const std::string& cluster) override;
Expand All @@ -150,7 +151,8 @@ class ClusterManagerImpl : public ClusterManager {
ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoPtr cluster);
~ClusterEntry();

Http::ConnectionPool::Instance* connPool(ResourcePriority priority);
Http::ConnectionPool::Instance* connPool(ResourcePriority priority,
LoadBalancerContext* context);

ThreadLocalClusterManagerImpl& parent_;
HostSetImpl host_set_;
Expand Down
Loading