Skip to content

Commit

Permalink
tcp: towards pluggable upstreams (#13331)
Browse files Browse the repository at this point in the history
Commit Message: Refactoring TCP code to match HTTP code in preparation for pluggable TCP upstreams.
Risk Level: High - fairly major refactor
Testing: existing tests pass
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Oct 13, 2020
1 parent cb7691c commit 82e7109
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 104 deletions.
95 changes: 30 additions & 65 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Filter::~Filter() {
access_log->log(nullptr, nullptr, nullptr, getStreamInfo());
}

ASSERT(upstream_handle_ == nullptr);
ASSERT(generic_conn_pool_ == nullptr);
ASSERT(upstream_ == nullptr);
}

Expand Down Expand Up @@ -442,24 +442,17 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {

bool Filter::maybeTunnel(const std::string& cluster_name) {
if (!config_->tunnelingConfig()) {
Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, this);
if (conn_pool) {
generic_conn_pool_ =
std::make_unique<TcpConnPool>(cluster_name, cluster_manager_, this, *upstream_callbacks_);
if (generic_conn_pool_->valid()) {
connecting_ = true;
connect_attempts_++;

// Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
// valid connection handle. If newConnection fails inline it may result in attempting to
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
// first call to newConnection will return null and the inner call will persist.
Tcp::ConnectionPool::Cancellable* handle = conn_pool->newConnection(*this);
if (handle) {
ASSERT(upstream_handle_.get() == nullptr);
upstream_handle_ = std::make_shared<TcpConnectionHandle>(handle);
}
generic_conn_pool_->newStream(this);
// Because we never return open connections to the pool, this either has a handle waiting on
// connection completion, or onPoolFailure has been invoked. Either way, stop iteration.
return true;
} else {
generic_conn_pool_.reset();
}
} else {
auto* cluster = cluster_manager_.get(cluster_name);
Expand All @@ -474,28 +467,23 @@ bool Filter::maybeTunnel(const std::string& cluster_name) {
"http2_protocol_options on the cluster.");
return false;
}
Http::ConnectionPool::Instance* conn_pool = cluster_manager_.httpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, this);
if (conn_pool) {
upstream_ = std::make_unique<HttpUpstream>(*upstream_callbacks_,
config_->tunnelingConfig()->hostname());
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
Http::ConnectionPool::Cancellable* cancellable =
conn_pool->newStream(http_upstream->responseDecoder(), *this);
if (cancellable) {
ASSERT(upstream_handle_.get() == nullptr);
upstream_handle_ = std::make_shared<HttpConnectionHandle>(cancellable);
}

generic_conn_pool_ = std::make_unique<HttpConnPool>(cluster_name, cluster_manager_, this,
config_->tunnelingConfig()->hostname(),
*upstream_callbacks_);
if (generic_conn_pool_->valid()) {
generic_conn_pool_->newStream(this);
return true;
} else {
generic_conn_pool_.reset();
}
}

return false;
}
void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_.reset();

void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
generic_conn_pool_.reset();
read_callbacks_->upstreamHost(host);
getStreamInfo().onUpstreamHostSelected(host);

Expand All @@ -518,44 +506,22 @@ void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason,
}
}

void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
upstream_handle_.reset();
void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
upstream_ = std::move(upstream);
generic_conn_pool_.reset();
read_callbacks_->upstreamHost(host);
getStreamInfo().onUpstreamHostSelected(host);
getStreamInfo().setUpstreamLocalAddress(local_address);
getStreamInfo().setUpstreamSslConnection(ssl_info);
onUpstreamConnection();
read_callbacks_->continueReading();
}

void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) {
Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();

upstream_ = std::make_unique<TcpUpstream>(std::move(conn_data), *upstream_callbacks_);
onPoolReadyBase(host, latched_data->connection().localAddress(),
latched_data->connection().streamInfo().downstreamSslConnection());
read_callbacks_->connection().streamInfo().setUpstreamFilterState(
latched_data->connection().streamInfo().filterState());
}

void Filter::onPoolFailure(ConnectionPool::PoolFailureReason failure, absl::string_view,
Upstream::HostDescriptionConstSharedPtr host) {
onPoolFailure(failure, host);
}

void Filter::onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
Http::RequestEncoder* latched_encoder = &request_encoder;
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
http_upstream->setRequestEncoder(request_encoder,
host->transportSocketFactory().implementsSecureTransport());

onPoolReadyBase(host, latched_encoder->getStream().connectionLocalAddress(),
info.downstreamSslConnection());
if (info) {
read_callbacks_->connection().streamInfo().setUpstreamFilterState(info->filterState());
}
}

const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
Expand Down Expand Up @@ -624,12 +590,11 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
disableIdleTimer();
}
}
if (upstream_handle_) {
if (generic_conn_pool_) {
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
// Cancel the conn pool request and close any excess pending requests.
upstream_handle_->cancel();
upstream_handle_.reset();
generic_conn_pool_.reset();
}
}
}
Expand Down
36 changes: 15 additions & 21 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ class PerConnectionCluster : public StreamInfo::FilterState::Object {
*/
class Filter : public Network::ReadFilter,
public Upstream::LoadBalancerContextBase,
Tcp::ConnectionPool::Callbacks,
public Http::ConnectionPool::Callbacks,
protected Logger::Loggable<Logger::Id::filter> {
protected Logger::Loggable<Logger::Id::filter>,
public GenericConnectionPoolCallbacks {
public:
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
~Filter() override;
Expand All @@ -254,23 +253,13 @@ class Filter : public Network::ReadFilter,
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;

// Tcp::ConnectionPool::Callbacks
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) override;

// Http::ConnectionPool::Callbacks,
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) override;

void onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info);
// GenericConnectionPoolCallbacks
void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) override;
void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) override;

// Upstream::LoadBalancerContext
const Router::MetadataMatchCriteria* metadataMatchCriteria() override;
Expand Down Expand Up @@ -375,10 +364,15 @@ class Filter : public Network::ReadFilter,
Event::TimerPtr idle_timer_;
Event::TimerPtr connection_duration_timer_;

std::shared_ptr<ConnectionHandle> upstream_handle_;
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
// The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist
// until either the upstream or downstream connection is terminated.
std::unique_ptr<GenericUpstream> upstream_;
// The connection pool used to set up |upstream_|.
// This will be non-null from when an upstream connection is attempted until
// it either succeeds or fails.
std::unique_ptr<GenericConnPool> generic_conn_pool_;
RouteConstSharedPtr route_;
Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
Expand Down
96 changes: 96 additions & 0 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "common/tcp_proxy/upstream.h"

#include "envoy/upstream/cluster_manager.h"

#include "common/http/header_map_impl.h"
#include "common/http/headers.h"
#include "common/http/utility.h"
Expand Down Expand Up @@ -152,5 +154,99 @@ void HttpUpstream::doneWriting() {
}
}

TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: upstream_callbacks_(upstream_callbacks) {
conn_pool_ = cluster_manager.tcpConnPoolForCluster(cluster_name,
Upstream::ResourcePriority::Default, context);
}

TcpConnPool::~TcpConnPool() {
if (upstream_handle_ != nullptr) {
upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
}
}

bool TcpConnPool::valid() const { return conn_pool_ != nullptr; }

void TcpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
// Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
// valid connection handle. If newConnection fails inline it may result in attempting to
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
// first call to newConnection will return null and the inner call will persist.
Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this);
if (handle) {
ASSERT(upstream_handle_ == nullptr);
upstream_handle_ = handle;
}
}

void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_ = nullptr;
callbacks_->onGenericPoolFailure(reason, host);
}

void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_ = nullptr;
Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
Network::Connection& connection = conn_data->connection();

auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
callbacks_->onGenericPoolReady(&connection.streamInfo(), std::move(upstream), host,
latched_data->connection().localAddress(),
latched_data->connection().streamInfo().downstreamSslConnection());
}

HttpConnPool::HttpConnPool(const std::string& cluster_name,
Upstream::ClusterManager& cluster_manager,
Upstream::LoadBalancerContext* context, std::string hostname,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: hostname_(hostname), upstream_callbacks_(upstream_callbacks) {
conn_pool_ = cluster_manager.httpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context);
}

HttpConnPool::~HttpConnPool() {
if (upstream_handle_ != nullptr) {
// Because HTTP connections are generally shorter lived and have a higher probability of use
// before going idle, they are closed with Default rather than CloseExcess.
upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
}
}

bool HttpConnPool::valid() const { return conn_pool_ != nullptr; }

void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, hostname_);
Tcp::ConnectionPool::Cancellable* handle =
conn_pool_->newStream(upstream_->responseDecoder(), *this);
if (handle != nullptr) {
upstream_handle_ = handle;
}
}

void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_ = nullptr;
callbacks_->onGenericPoolFailure(reason, host);
}

void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
upstream_handle_ = nullptr;
Http::RequestEncoder* latched_encoder = &request_encoder;
upstream_->setRequestEncoder(request_encoder,
host->transportSocketFactory().implementsSecureTransport());
callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host,
latched_encoder->getStream().connectionLocalAddress(),
info.downstreamSslConnection());
}

} // namespace TcpProxy
} // namespace Envoy
Loading

0 comments on commit 82e7109

Please sign in to comment.