Skip to content

Commit

Permalink
conn_pool: track streams across the pool (#13684)
Browse files Browse the repository at this point in the history
Tracking active, pending, and available capacity for each thread local cluster, for eventual use in cluster-wide prefetch

Risk Level: low
Testing: new unit tests
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Nov 18, 2020
1 parent 6e5227e commit 8d62990
Show file tree
Hide file tree
Showing 24 changed files with 321 additions and 151 deletions.
47 changes: 45 additions & 2 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,47 @@ using ClusterUpdateCallbacksHandlePtr = std::unique_ptr<ClusterUpdateCallbacksHa

class ClusterManagerFactory;

// These are per-cluster per-thread, so not "global" stats.
struct ClusterConnectivityState {
~ClusterConnectivityState() {
ASSERT(pending_streams_ == 0);
ASSERT(active_streams_ == 0);
ASSERT(connecting_stream_capacity_ == 0);
}

template <class T> void checkAndDecrement(T& value, uint32_t delta) {
ASSERT(delta <= value);
value -= delta;
}

template <class T> void checkAndIncrement(T& value, uint32_t delta) {
ASSERT(std::numeric_limits<T>::max() - value > delta);
value += delta;
}

void incrPendingStreams(uint32_t delta) { checkAndIncrement<uint32_t>(pending_streams_, delta); }
void decrPendingStreams(uint32_t delta) { checkAndDecrement<uint32_t>(pending_streams_, delta); }
void incrConnectingStreamCapacity(uint32_t delta) {
checkAndIncrement<uint64_t>(connecting_stream_capacity_, delta);
}
void decrConnectingStreamCapacity(uint32_t delta) {
checkAndDecrement<uint64_t>(connecting_stream_capacity_, delta);
}
void incrActiveStreams(uint32_t delta) { checkAndIncrement<uint32_t>(active_streams_, delta); }
void decrActiveStreams(uint32_t delta) { checkAndDecrement<uint32_t>(active_streams_, delta); }

// Tracks the number of pending streams for this ClusterManager.
uint32_t pending_streams_{};
// Tracks the number of active streams for this ClusterManager.
uint32_t active_streams_{};
// Tracks the available stream capacity if all connecting connections were connected.
//
// For example, if an H2 connection is started with concurrent stream limit of 100, this
// goes up by 100. If the connection is established and 2 streams are in use, it
// would be reduced to 98 (as 2 of the 100 are not available).
uint64_t connecting_stream_capacity_{};
};

/**
* Manages connection pools and load balancing for upstream clusters. The cluster manager is
* persistent and shared among multiple ongoing requests/connections.
Expand Down Expand Up @@ -321,7 +362,8 @@ class ClusterManagerFactory {
allocateConnPool(Event::Dispatcher& dispatcher, HostConstSharedPtr host,
ResourcePriority priority, Http::Protocol protocol,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options) PURE;
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
ClusterConnectivityState& state) PURE;

/**
* Allocate a TCP connection pool for the host. Pools are separated by 'priority' and
Expand All @@ -331,7 +373,8 @@ class ClusterManagerFactory {
allocateTcpConnPool(Event::Dispatcher& dispatcher, HostConstSharedPtr host,
ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
Network::TransportSocketOptionsSharedPtr transport_socket_options) PURE;
Network::TransportSocketOptionsSharedPtr transport_socket_options,
ClusterConnectivityState& state) PURE;

/**
* Allocate a cluster from configuration proto.
Expand Down
35 changes: 28 additions & 7 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ namespace ConnectionPool {
ConnPoolImplBase::ConnPoolImplBase(
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options)
: host_(host), priority_(priority), dispatcher_(dispatcher), socket_options_(options),
transport_socket_options_(transport_socket_options) {}
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Upstream::ClusterConnectivityState& state)
: state_(state), host_(host), priority_(priority), dispatcher_(dispatcher),
socket_options_(options), transport_socket_options_(transport_socket_options) {}

ConnPoolImplBase::~ConnPoolImplBase() {
ASSERT(ready_clients_.empty());
Expand Down Expand Up @@ -107,6 +108,8 @@ bool ConnPoolImplBase::tryCreateNewConnection(float global_prefetch_ratio) {
ASSERT(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
client->effectiveConcurrentStreamLimit());
ASSERT(client->real_host_description_);
// Increase the connecting capacity to reflect the streams this connection can serve.
state_.incrConnectingStreamCapacity(client->effectiveConcurrentStreamLimit());
connecting_stream_capacity_ += client->effectiveConcurrentStreamLimit();
LinkedList::moveIntoList(std::move(client), owningList(client->state_));
}
Expand Down Expand Up @@ -135,6 +138,10 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient&
transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::BUSY);
}

// Decrement the capacity, as there's one less stream available for serving.
state_.decrConnectingStreamCapacity(1);
// Track the new active stream.
state_.incrActiveStreams(1);
num_active_streams_++;
host_->stats().rq_total_.inc();
host_->stats().rq_active_.inc();
Expand All @@ -150,10 +157,18 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
bool delay_attaching_stream) {
ENVOY_CONN_LOG(debug, "destroying stream: {} remaining", client, client.numActiveStreams());
ASSERT(num_active_streams_ > 0);
// Reflect there's one less stream in flight.
state_.decrActiveStreams(1);
num_active_streams_--;
host_->stats().rq_active_.dec();
host_->cluster().stats().upstream_rq_active_.dec();
host_->cluster().resourceManager(priority_).requests().dec();
// If the effective client capacity was limited by concurrency, increase connecting capacity.
// If the effective client capacity was limited by max total streams, this will not result in an
// increment as no capacity is freed up.
if (client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1) {
state_.incrConnectingStreamCapacity(1);
}
if (client.state_ == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) {
// Close out the draining client if we no longer have active streams.
client.close();
Expand Down Expand Up @@ -205,6 +220,7 @@ void ConnPoolImplBase::onUpstreamReady() {
ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
// Pending streams are pushed onto the front, so pull from the back.
attachStreamToClient(*client, pending_streams_.back()->context());
state_.decrPendingStreams(1);
pending_streams_.pop_back();
}
}
Expand Down Expand Up @@ -310,6 +326,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view

if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
state_.decrConnectingStreamCapacity(client.currentUnusedCapacity());
// Make sure that onStreamClosed won't double count.
client.remaining_streams_ = 0;
// The client died.
ENVOY_CONN_LOG(debug, "client disconnected, failure reason: {}", client, failure_reason);

Expand Down Expand Up @@ -397,6 +416,7 @@ void ConnPoolImplBase::purgePendingStreams(
absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason) {
// NOTE: We move the existing pending streams to a temporary list. This is done so that
// if retry logic submits a new stream to the pool, we don't fail it inline.
state_.decrPendingStreams(pending_streams_.size());
pending_streams_to_purge_ = std::move(pending_streams_);
while (!pending_streams_to_purge_.empty()) {
PendingStreamPtr stream =
Expand Down Expand Up @@ -431,6 +451,7 @@ void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
// and there is no need to call its onPoolFailure callback.
stream.removeFromList(pending_streams_to_purge_);
} else {
state_.decrPendingStreams(1);
stream.removeFromList(pending_streams_);
}
if (policy == Envoy::ConnectionPool::CancelPolicy::CloseExcess && !connecting_clients_.empty() &&
Expand All @@ -447,13 +468,13 @@ void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
namespace {
// Translate zero to UINT64_MAX so that the zero/unlimited case doesn't
// have to be handled specially.
uint64_t translateZeroToUnlimited(uint64_t limit) {
return (limit != 0) ? limit : std::numeric_limits<uint64_t>::max();
uint32_t translateZeroToUnlimited(uint32_t limit) {
return (limit != 0) ? limit : std::numeric_limits<uint32_t>::max();
}
} // namespace

ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
uint64_t concurrent_stream_limit)
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
uint32_t concurrent_stream_limit)
: parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
connect_timer_(parent_.dispatcher().createTimer([this]() -> void { onConnectTimeout(); })) {
Expand Down
44 changes: 34 additions & 10 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "envoy/event/dispatcher.h"
#include "envoy/network/connection.h"
#include "envoy/stats/timespan.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/linked_object.h"

Expand All @@ -28,8 +29,8 @@ class ActiveClient : public LinkedObject<ActiveClient>,
public Event::DeferredDeletable,
protected Logger::Loggable<Logger::Id::pool> {
public:
ActiveClient(ConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
uint64_t concurrent_stream_limit);
ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
uint32_t concurrent_stream_limit);
~ActiveClient() override;

void releaseResources();
Expand All @@ -44,18 +45,22 @@ class ActiveClient : public LinkedObject<ActiveClient>,

// Returns the concurrent stream limit, accounting for if the total stream limit
// is less than the concurrent stream limit.
uint64_t effectiveConcurrentStreamLimit() const {
uint32_t effectiveConcurrentStreamLimit() const {
return std::min(remaining_streams_, concurrent_stream_limit_);
}

uint32_t currentUnusedCapacity() const {
return std::min(remaining_streams_, concurrent_stream_limit_ - numActiveStreams());
}

// Closes the underlying connection.
virtual void close() PURE;
// Returns the ID of the underlying connection.
virtual uint64_t id() const PURE;
// Returns true if this closed with an incomplete stream, for stats tracking/ purposes.
virtual bool closingWithIncompleteStream() const PURE;
// Returns the number of active streams on this connection.
virtual size_t numActiveStreams() const PURE;
virtual uint32_t numActiveStreams() const PURE;

enum class State {
CONNECTING, // Connection is not yet established.
Expand All @@ -67,8 +72,8 @@ class ActiveClient : public LinkedObject<ActiveClient>,
};

ConnPoolImplBase& parent_;
uint64_t remaining_streams_;
const uint64_t concurrent_stream_limit_;
uint32_t remaining_streams_;
const uint32_t concurrent_stream_limit_;
State state_{State::CONNECTING};
Upstream::HostDescriptionConstSharedPtr real_host_description_;
Stats::TimespanPtr conn_connect_ms_;
Expand Down Expand Up @@ -105,7 +110,8 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
ConnPoolImplBase(Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
Event::Dispatcher& dispatcher,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options);
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Upstream::ClusterConnectivityState& state);
virtual ~ConnPoolImplBase();

// A helper function to get the specific context type from the base class context.
Expand Down Expand Up @@ -196,6 +202,22 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {

float perUpstreamPrefetchRatio() const;

ConnectionPool::Cancellable*
addPendingStream(Envoy::ConnectionPool::PendingStreamPtr&& pending_stream) {
LinkedList::moveIntoList(std::move(pending_stream), pending_streams_);
state_.incrPendingStreams(1);
return pending_streams_.front().get();
}

bool hasActiveStreams() const { return num_active_streams_ > 0; }

void decrConnectingStreamCapacity(int32_t delta) {
state_.decrConnectingStreamCapacity(delta);
connecting_stream_capacity_ -= delta;
}

Upstream::ClusterConnectivityState& state_;

const Upstream::HostConstSharedPtr host_;
const Upstream::ResourcePriority priority_;

Expand All @@ -204,7 +226,6 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
const Network::TransportSocketOptionsSharedPtr transport_socket_options_;

std::list<Instance::DrainedCb> drained_callbacks_;
std::list<PendingStreamPtr> pending_streams_;

// When calling purgePendingStreams, this list will be used to hold the streams we are about
// to purge. We need this if one cancelled streams cancels a different pending stream
Expand All @@ -220,12 +241,15 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
// Clients that are not ready to handle additional streams because they are CONNECTING.
std::list<ActiveClientPtr> connecting_clients_;

private:
std::list<PendingStreamPtr> pending_streams_;

// The number of streams currently attached to clients.
uint64_t num_active_streams_{0};
uint32_t num_active_streams_{0};

// The number of streams that can be immediately dispatched
// if all CONNECTING connections become connected.
uint64_t connecting_stream_capacity_{0};
uint32_t connecting_stream_capacity_{0};
};

} // namespace ConnectionPool
Expand Down
5 changes: 4 additions & 1 deletion source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ void CodecClient::onEvent(Network::ConnectionEvent event) {
}
}

void CodecClient::responseDecodeComplete(ActiveRequest& request) {
void CodecClient::responsePreDecodeComplete(ActiveRequest& request) {
ENVOY_CONN_LOG(debug, "response complete", *connection_);
if (codec_client_callbacks_) {
codec_client_callbacks_->onStreamPreDecodeComplete();
}
deleteRequest(request);

// HTTP/2 can send us a reset after a complete response if the request was not complete. Users
Expand Down
7 changes: 5 additions & 2 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class CodecClientCallbacks {
public:
virtual ~CodecClientCallbacks() = default;

// Called in onPreDecodeComplete
virtual void onStreamPreDecodeComplete() {}

/**
* Called every time an owned stream is destroyed, whether complete or not.
*/
Expand Down Expand Up @@ -201,7 +204,7 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
void onBelowWriteBufferLowWatermark() override {}

// StreamDecoderWrapper
void onPreDecodeComplete() override { parent_.responseDecodeComplete(*this); }
void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); }
void onDecodeComplete() override {}

RequestEncoder* encoder_{};
Expand All @@ -214,7 +217,7 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
* Called when a response finishes decoding. This is called *before* forwarding on to the
* wrapped decoder.
*/
void responseDecodeComplete(ActiveRequest& request);
void responsePreDecodeComplete(ActiveRequest& request);

void deleteRequest(ActiveRequest& request);
void onReset(ActiveRequest& request, StreamResetReason reason);
Expand Down
10 changes: 5 additions & 5 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ HttpConnPoolImplBase::HttpConnPoolImplBase(
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsSharedPtr& transport_socket_options,
Random::RandomGenerator& random_generator, std::vector<Http::Protocol> protocols)
Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
std::vector<Http::Protocol> protocols)
: Envoy::ConnectionPool::ConnPoolImplBase(
host, priority, dispatcher, options,
wrapTransportSocketOptions(transport_socket_options, protocols)),
wrapTransportSocketOptions(transport_socket_options, protocols), state),
random_generator_(random_generator) {
ASSERT(!protocols.empty());
// TODO(alyssawilk) the protocol function should probably be an optional and
Expand All @@ -73,7 +74,7 @@ HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
}

bool HttpConnPoolImplBase::hasActiveConnections() const {
return (!pending_streams_.empty() || (num_active_streams_ > 0));
return (hasPendingStreams() || (hasActiveStreams()));
}

ConnectionPool::Cancellable*
Expand All @@ -83,8 +84,7 @@ HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& con
ENVOY_LOG(debug, "queueing stream due to no available connections");
Envoy::ConnectionPool::PendingStreamPtr pending_stream(
new HttpPendingStream(*this, decoder, callbacks));
LinkedList::moveIntoList(std::move(pending_stream), pending_streams_);
return pending_streams_.front().get();
return addPendingStream(std::move(pending_stream));
}

void HttpConnPoolImplBase::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
Expand Down
Loading

0 comments on commit 8d62990

Please sign in to comment.