Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp: towards pluggable upstreams #13331

Merged
merged 7 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 30 additions & 65 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Filter::~Filter() {
access_log->log(nullptr, nullptr, nullptr, getStreamInfo());
}

ASSERT(upstream_handle_ == nullptr);
ASSERT(generic_conn_pool_ == nullptr);
ASSERT(upstream_ == nullptr);
}

Expand Down Expand Up @@ -442,24 +442,17 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {

bool Filter::maybeTunnel(const std::string& cluster_name) {
if (!config_->tunnelingConfig()) {
Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, this);
if (conn_pool) {
generic_conn_pool_ =
std::make_unique<TcpConnPool>(cluster_name, cluster_manager_, this, *upstream_callbacks_);
if (generic_conn_pool_->valid()) {
connecting_ = true;
connect_attempts_++;

// Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
// valid connection handle. If newConnection fails inline it may result in attempting to
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
// first call to newConnection will return null and the inner call will persist.
Tcp::ConnectionPool::Cancellable* handle = conn_pool->newConnection(*this);
if (handle) {
ASSERT(upstream_handle_.get() == nullptr);
upstream_handle_ = std::make_shared<TcpConnectionHandle>(handle);
}
generic_conn_pool_->newStream(this);
// Because we never return open connections to the pool, this either has a handle waiting on
// connection completion, or onPoolFailure has been invoked. Either way, stop iteration.
return true;
} else {
generic_conn_pool_.reset();
}
} else {
auto* cluster = cluster_manager_.get(cluster_name);
Expand All @@ -474,28 +467,23 @@ bool Filter::maybeTunnel(const std::string& cluster_name) {
"http2_protocol_options on the cluster.");
return false;
}
Http::ConnectionPool::Instance* conn_pool = cluster_manager_.httpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, this);
if (conn_pool) {
upstream_ = std::make_unique<HttpUpstream>(*upstream_callbacks_,
config_->tunnelingConfig()->hostname());
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
Http::ConnectionPool::Cancellable* cancellable =
conn_pool->newStream(http_upstream->responseDecoder(), *this);
if (cancellable) {
ASSERT(upstream_handle_.get() == nullptr);
upstream_handle_ = std::make_shared<HttpConnectionHandle>(cancellable);
}

generic_conn_pool_ = std::make_unique<HttpConnPool>(cluster_name, cluster_manager_, this,
config_->tunnelingConfig()->hostname(),
*upstream_callbacks_);
if (generic_conn_pool_->valid()) {
generic_conn_pool_->newStream(this);
return true;
} else {
generic_conn_pool_.reset();
}
}

return false;
}
void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_.reset();

void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
generic_conn_pool_.reset();
read_callbacks_->upstreamHost(host);
getStreamInfo().onUpstreamHostSelected(host);

Expand All @@ -518,44 +506,22 @@ void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason,
}
}

void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
upstream_handle_.reset();
void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
upstream_ = std::move(upstream);
generic_conn_pool_.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to say something about the invariants on generic_conn_pool_ and upstream_. It seems that upstream_ should be null as we enter this method and when we create a generic_conn_pool_, and generic_conn_pool_ should be null when upstream_ is not null.

read_callbacks_->upstreamHost(host);
getStreamInfo().onUpstreamHostSelected(host);
getStreamInfo().setUpstreamLocalAddress(local_address);
getStreamInfo().setUpstreamSslConnection(ssl_info);
onUpstreamConnection();
read_callbacks_->continueReading();
}

void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) {
Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();

upstream_ = std::make_unique<TcpUpstream>(std::move(conn_data), *upstream_callbacks_);
onPoolReadyBase(host, latched_data->connection().localAddress(),
latched_data->connection().streamInfo().downstreamSslConnection());
read_callbacks_->connection().streamInfo().setUpstreamFilterState(
latched_data->connection().streamInfo().filterState());
}

void Filter::onPoolFailure(ConnectionPool::PoolFailureReason failure, absl::string_view,
Upstream::HostDescriptionConstSharedPtr host) {
onPoolFailure(failure, host);
}

void Filter::onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
Http::RequestEncoder* latched_encoder = &request_encoder;
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
http_upstream->setRequestEncoder(request_encoder,
host->transportSocketFactory().implementsSecureTransport());

onPoolReadyBase(host, latched_encoder->getStream().connectionLocalAddress(),
info.downstreamSslConnection());
if (info) {
read_callbacks_->connection().streamInfo().setUpstreamFilterState(info->filterState());
}
}

const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
Expand Down Expand Up @@ -624,12 +590,11 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
disableIdleTimer();
}
}
if (upstream_handle_) {
if (generic_conn_pool_) {
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
// Cancel the conn pool request and close any excess pending requests.
upstream_handle_->cancel();
upstream_handle_.reset();
generic_conn_pool_.reset();
}
}
}
Expand Down
36 changes: 15 additions & 21 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ class PerConnectionCluster : public StreamInfo::FilterState::Object {
*/
class Filter : public Network::ReadFilter,
public Upstream::LoadBalancerContextBase,
Tcp::ConnectionPool::Callbacks,
public Http::ConnectionPool::Callbacks,
protected Logger::Loggable<Logger::Id::filter> {
protected Logger::Loggable<Logger::Id::filter>,
public GenericConnectionPoolCallbacks {
public:
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
~Filter() override;
Expand All @@ -254,23 +253,13 @@ class Filter : public Network::ReadFilter,
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;

// Tcp::ConnectionPool::Callbacks
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) override;

// Http::ConnectionPool::Callbacks,
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) override;

void onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info);
// GenericConnectionPoolCallbacks
void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) override;
void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) override;

// Upstream::LoadBalancerContext
const Router::MetadataMatchCriteria* metadataMatchCriteria() override;
Expand Down Expand Up @@ -375,10 +364,15 @@ class Filter : public Network::ReadFilter,
Event::TimerPtr idle_timer_;
Event::TimerPtr connection_duration_timer_;

std::shared_ptr<ConnectionHandle> upstream_handle_;
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
// The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist
// until either the upstream or downstream connection is terminated.
std::unique_ptr<GenericUpstream> upstream_;
// The connection pool used to set up |upstream_|.
// This will be non-null from when an upstream connection is attempted until
// it either succeeds or fails.
std::unique_ptr<GenericConnPool> generic_conn_pool_;
RouteConstSharedPtr route_;
Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
Expand Down
96 changes: 96 additions & 0 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "common/tcp_proxy/upstream.h"

#include "envoy/upstream/cluster_manager.h"

#include "common/http/header_map_impl.h"
#include "common/http/headers.h"
#include "common/http/utility.h"
Expand Down Expand Up @@ -152,5 +154,99 @@ void HttpUpstream::doneWriting() {
}
}

TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: upstream_callbacks_(upstream_callbacks) {
conn_pool_ = cluster_manager.tcpConnPoolForCluster(cluster_name,
Upstream::ResourcePriority::Default, context);
}

TcpConnPool::~TcpConnPool() {
if (upstream_handle_ != nullptr) {
upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess);
}
}

bool TcpConnPool::valid() const { return conn_pool_ != nullptr; }

void TcpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
// Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
// valid connection handle. If newConnection fails inline it may result in attempting to
// select a new host, and a recursive call to initializeUpstreamConnection. In this case the
// first call to newConnection will return null and the inner call will persist.
Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this);
if (handle) {
ASSERT(upstream_handle_ == nullptr);
upstream_handle_ = handle;
}
}

void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_ = nullptr;
callbacks_->onGenericPoolFailure(reason, host);
}

void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing newline above.

Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_ = nullptr;
Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
Network::Connection& connection = conn_data->connection();

auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
callbacks_->onGenericPoolReady(&connection.streamInfo(), std::move(upstream), host,
latched_data->connection().localAddress(),
latched_data->connection().streamInfo().downstreamSslConnection());
}

HttpConnPool::HttpConnPool(const std::string& cluster_name,
Upstream::ClusterManager& cluster_manager,
Upstream::LoadBalancerContext* context, std::string hostname,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: hostname_(hostname), upstream_callbacks_(upstream_callbacks) {
conn_pool_ = cluster_manager.httpConnPoolForCluster(
cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context);
}

HttpConnPool::~HttpConnPool() {
if (upstream_handle_ != nullptr) {
upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cancel uses Default, but the Tcp one above uses CloseExcess. Why are they different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would read better if you used Http::ConnectionPool::.... They're the same type, but it looks weird this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

legacy - the TCP pool always used close excess, and HTTP always used default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment explaining that please.

}
}

bool HttpConnPool::valid() const { return conn_pool_ != nullptr; }

void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, hostname_);
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
Tcp::ConnectionPool::Cancellable* handle =
conn_pool_->newStream(http_upstream->responseDecoder(), *this);
if (handle != nullptr) {
upstream_handle_ = handle;
}
}

void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view,
Upstream::HostDescriptionConstSharedPtr host) {
upstream_handle_ = nullptr;
callbacks_->onGenericPoolFailure(reason, host);
}

void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing newline above.

Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
upstream_handle_ = nullptr;
Http::RequestEncoder* latched_encoder = &request_encoder;
HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get());
http_upstream->setRequestEncoder(request_encoder,
host->transportSocketFactory().implementsSecureTransport());
callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host,
latched_encoder->getStream().connectionLocalAddress(),
info.downstreamSslConnection());
}

} // namespace TcpProxy
} // namespace Envoy
Loading