diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 5ca0ea661d83..3688634f5305 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -225,7 +225,7 @@ Filter::~Filter() { access_log->log(nullptr, nullptr, nullptr, getStreamInfo()); } - ASSERT(upstream_handle_ == nullptr); + ASSERT(generic_conn_pool_ == nullptr); ASSERT(upstream_ == nullptr); } @@ -442,24 +442,17 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { bool Filter::maybeTunnel(const std::string& cluster_name) { if (!config_->tunnelingConfig()) { - Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster( - cluster_name, Upstream::ResourcePriority::Default, this); - if (conn_pool) { + generic_conn_pool_ = + std::make_unique(cluster_name, cluster_manager_, this, *upstream_callbacks_); + if (generic_conn_pool_->valid()) { connecting_ = true; connect_attempts_++; - - // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a - // valid connection handle. If newConnection fails inline it may result in attempting to - // select a new host, and a recursive call to initializeUpstreamConnection. In this case the - // first call to newConnection will return null and the inner call will persist. - Tcp::ConnectionPool::Cancellable* handle = conn_pool->newConnection(*this); - if (handle) { - ASSERT(upstream_handle_.get() == nullptr); - upstream_handle_ = std::make_shared(handle); - } + generic_conn_pool_->newStream(this); // Because we never return open connections to the pool, this either has a handle waiting on // connection completion, or onPoolFailure has been invoked. Either way, stop iteration. return true; + } else { + generic_conn_pool_.reset(); } } else { auto* cluster = cluster_manager_.get(cluster_name); @@ -474,28 +467,23 @@ bool Filter::maybeTunnel(const std::string& cluster_name) { "http2_protocol_options on the cluster."); return false; } - Http::ConnectionPool::Instance* conn_pool = cluster_manager_.httpConnPoolForCluster( - cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, this); - if (conn_pool) { - upstream_ = std::make_unique(*upstream_callbacks_, - config_->tunnelingConfig()->hostname()); - HttpUpstream* http_upstream = static_cast(upstream_.get()); - Http::ConnectionPool::Cancellable* cancellable = - conn_pool->newStream(http_upstream->responseDecoder(), *this); - if (cancellable) { - ASSERT(upstream_handle_.get() == nullptr); - upstream_handle_ = std::make_shared(cancellable); - } + + generic_conn_pool_ = std::make_unique(cluster_name, cluster_manager_, this, + config_->tunnelingConfig()->hostname(), + *upstream_callbacks_); + if (generic_conn_pool_->valid()) { + generic_conn_pool_->newStream(this); return true; + } else { + generic_conn_pool_.reset(); } } return false; } -void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason, - Upstream::HostDescriptionConstSharedPtr host) { - upstream_handle_.reset(); - +void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) { + generic_conn_pool_.reset(); read_callbacks_->upstreamHost(host); getStreamInfo().onUpstreamHostSelected(host); @@ -518,44 +506,22 @@ void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason, } } -void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, - const Network::Address::InstanceConstSharedPtr& local_address, - Ssl::ConnectionInfoConstSharedPtr ssl_info) { - upstream_handle_.reset(); +void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info, + std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) { + upstream_ = std::move(upstream); + generic_conn_pool_.reset(); read_callbacks_->upstreamHost(host); getStreamInfo().onUpstreamHostSelected(host); getStreamInfo().setUpstreamLocalAddress(local_address); getStreamInfo().setUpstreamSslConnection(ssl_info); onUpstreamConnection(); read_callbacks_->continueReading(); -} - -void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) { - Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); - - upstream_ = std::make_unique(std::move(conn_data), *upstream_callbacks_); - onPoolReadyBase(host, latched_data->connection().localAddress(), - latched_data->connection().streamInfo().downstreamSslConnection()); - read_callbacks_->connection().streamInfo().setUpstreamFilterState( - latched_data->connection().streamInfo().filterState()); -} - -void Filter::onPoolFailure(ConnectionPool::PoolFailureReason failure, absl::string_view, - Upstream::HostDescriptionConstSharedPtr host) { - onPoolFailure(failure, host); -} - -void Filter::onPoolReady(Http::RequestEncoder& request_encoder, - Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info) { - Http::RequestEncoder* latched_encoder = &request_encoder; - HttpUpstream* http_upstream = static_cast(upstream_.get()); - http_upstream->setRequestEncoder(request_encoder, - host->transportSocketFactory().implementsSecureTransport()); - - onPoolReadyBase(host, latched_encoder->getStream().connectionLocalAddress(), - info.downstreamSslConnection()); + if (info) { + read_callbacks_->connection().streamInfo().setUpstreamFilterState(info->filterState()); + } } const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() { @@ -624,12 +590,11 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) { disableIdleTimer(); } } - if (upstream_handle_) { + if (generic_conn_pool_) { if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { // Cancel the conn pool request and close any excess pending requests. - upstream_handle_->cancel(); - upstream_handle_.reset(); + generic_conn_pool_.reset(); } } } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index c7c91f2c85c2..12b482e01690 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -242,9 +242,8 @@ class PerConnectionCluster : public StreamInfo::FilterState::Object { */ class Filter : public Network::ReadFilter, public Upstream::LoadBalancerContextBase, - Tcp::ConnectionPool::Callbacks, - public Http::ConnectionPool::Callbacks, - protected Logger::Loggable { + protected Logger::Loggable, + public GenericConnectionPoolCallbacks { public: Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager); ~Filter() override; @@ -254,23 +253,13 @@ class Filter : public Network::ReadFilter, Network::FilterStatus onNewConnection() override; void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; - // Tcp::ConnectionPool::Callbacks - void onPoolFailure(ConnectionPool::PoolFailureReason reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) override; - - // Http::ConnectionPool::Callbacks, - void onPoolFailure(ConnectionPool::PoolFailureReason reason, - absl::string_view transport_failure_reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Http::RequestEncoder& request_encoder, - Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info) override; - - void onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, - const Network::Address::InstanceConstSharedPtr& local_address, - Ssl::ConnectionInfoConstSharedPtr ssl_info); + // GenericConnectionPoolCallbacks + void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) override; + void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) override; // Upstream::LoadBalancerContext const Router::MetadataMatchCriteria* metadataMatchCriteria() override; @@ -375,10 +364,15 @@ class Filter : public Network::ReadFilter, Event::TimerPtr idle_timer_; Event::TimerPtr connection_duration_timer_; - std::shared_ptr upstream_handle_; std::shared_ptr upstream_callbacks_; // shared_ptr required for passing as a // read filter. + // The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist + // until either the upstream or downstream connection is terminated. std::unique_ptr upstream_; + // The connection pool used to set up |upstream_|. + // This will be non-null from when an upstream connection is attempted until + // it either succeeds or fails. + std::unique_ptr generic_conn_pool_; RouteConstSharedPtr route_; Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; Network::TransportSocketOptionsSharedPtr transport_socket_options_; diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 451a277e0865..1da6eb915797 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -1,5 +1,7 @@ #include "common/tcp_proxy/upstream.h" +#include "envoy/upstream/cluster_manager.h" + #include "common/http/header_map_impl.h" #include "common/http/headers.h" #include "common/http/utility.h" @@ -152,5 +154,99 @@ void HttpUpstream::doneWriting() { } } +TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) + : upstream_callbacks_(upstream_callbacks) { + conn_pool_ = cluster_manager.tcpConnPoolForCluster(cluster_name, + Upstream::ResourcePriority::Default, context); +} + +TcpConnPool::~TcpConnPool() { + if (upstream_handle_ != nullptr) { + upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess); + } +} + +bool TcpConnPool::valid() const { return conn_pool_ != nullptr; } + +void TcpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { + callbacks_ = callbacks; + // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a + // valid connection handle. If newConnection fails inline it may result in attempting to + // select a new host, and a recursive call to initializeUpstreamConnection. In this case the + // first call to newConnection will return null and the inner call will persist. + Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this); + if (handle) { + ASSERT(upstream_handle_ == nullptr); + upstream_handle_ = handle; + } +} + +void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + callbacks_->onGenericPoolFailure(reason, host); +} + +void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); + Network::Connection& connection = conn_data->connection(); + + auto upstream = std::make_unique(std::move(conn_data), upstream_callbacks_); + callbacks_->onGenericPoolReady(&connection.streamInfo(), std::move(upstream), host, + latched_data->connection().localAddress(), + latched_data->connection().streamInfo().downstreamSslConnection()); +} + +HttpConnPool::HttpConnPool(const std::string& cluster_name, + Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, std::string hostname, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) + : hostname_(hostname), upstream_callbacks_(upstream_callbacks) { + conn_pool_ = cluster_manager.httpConnPoolForCluster( + cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context); +} + +HttpConnPool::~HttpConnPool() { + if (upstream_handle_ != nullptr) { + // Because HTTP connections are generally shorter lived and have a higher probability of use + // before going idle, they are closed with Default rather than CloseExcess. + upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default); + } +} + +bool HttpConnPool::valid() const { return conn_pool_ != nullptr; } + +void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { + callbacks_ = callbacks; + upstream_ = std::make_unique(upstream_callbacks_, hostname_); + Tcp::ConnectionPool::Cancellable* handle = + conn_pool_->newStream(upstream_->responseDecoder(), *this); + if (handle != nullptr) { + upstream_handle_ = handle; + } +} + +void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + callbacks_->onGenericPoolFailure(reason, host); +} + +void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info) { + upstream_handle_ = nullptr; + Http::RequestEncoder* latched_encoder = &request_encoder; + upstream_->setRequestEncoder(request_encoder, + host->transportSocketFactory().implementsSecureTransport()); + callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, + latched_encoder->getStream().connectionLocalAddress(), + info.downstreamSslConnection()); +} + } // namespace TcpProxy } // namespace Envoy diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index 8d2a301d7137..33943e70b982 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -3,42 +3,95 @@ #include "envoy/http/conn_pool.h" #include "envoy/network/connection.h" #include "envoy/tcp/conn_pool.h" +#include "envoy/upstream/load_balancer.h" #include "envoy/upstream/upstream.h" namespace Envoy { namespace TcpProxy { -// Interface for a generic ConnectionHandle, which can wrap a TcpConnectionHandle -// or an HttpConnectionHandle -class ConnectionHandle { +class GenericConnectionPoolCallbacks; +class GenericUpstream; + +// An API for wrapping either an HTTP or a TCP connection pool. +class GenericConnPool : public Logger::Loggable { public: - virtual ~ConnectionHandle() = default; - // Cancel the conn pool request and close any excess pending requests. - virtual void cancel() PURE; + virtual ~GenericConnPool() = default; + + // Called to create a new HTTP stream or TCP connection. The implementation + // is then responsible for calling either onPoolReady or onPoolFailure on the + // supplied GenericConnectionPoolCallbacks. + virtual void newStream(GenericConnectionPoolCallbacks* callbacks) PURE; + // Returns true if there was a valid connection pool, false otherwise. + virtual bool valid() const PURE; }; -// An implementation of ConnectionHandle which works with the Tcp::ConnectionPool. -class TcpConnectionHandle : public ConnectionHandle { +class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks { public: - TcpConnectionHandle(Tcp::ConnectionPool::Cancellable* handle) : upstream_handle_(handle) {} + TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks); + ~TcpConnPool() override; + + // GenericConnPool + bool valid() const override; + void newStream(GenericConnectionPoolCallbacks* callbacks) override; - void cancel() override { - upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess); - } + // Tcp::ConnectionPool::Callbacks + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) override; private: + Tcp::ConnectionPool::Instance* conn_pool_{}; Tcp::ConnectionPool::Cancellable* upstream_handle_{}; + GenericConnectionPoolCallbacks* callbacks_{}; + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; }; -class HttpConnectionHandle : public ConnectionHandle { +class HttpUpstream; + +class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks { public: - HttpConnectionHandle(Http::ConnectionPool::Cancellable* handle) : upstream_http_handle_(handle) {} - void cancel() override { - upstream_http_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); - } + HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, std::string hostname, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks); + ~HttpConnPool() override; + + // GenericConnPool + bool valid() const override; + void newStream(GenericConnectionPoolCallbacks* callbacks) override; + + // Http::ConnectionPool::Callbacks, + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info) override; private: - Http::ConnectionPool::Cancellable* upstream_http_handle_{}; + const std::string hostname_; + Http::ConnectionPool::Instance* conn_pool_{}; + Http::ConnectionPool::Cancellable* upstream_handle_{}; + GenericConnectionPoolCallbacks* callbacks_{}; + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; + std::unique_ptr upstream_; +}; + +// An API for the UpstreamRequest to get callbacks from either an HTTP or TCP +// connection pool. +class GenericConnectionPoolCallbacks { +public: + virtual ~GenericConnectionPoolCallbacks() = default; + + virtual void onGenericPoolReady(StreamInfo::StreamInfo* info, + std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) PURE; + virtual void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) PURE; }; // Interface for a generic Upstream, which can communicate with a TCP or HTTP