Skip to content

Commit

Permalink
[native] Share HTTP connection pools between HTTP clients
Browse files Browse the repository at this point in the history
Currently each exchange source owns one unique `SessionPool`, so the connection
is only reused for one specific point to point communication.  This creates many
connections between the same upstream and downstream nodes, decreasing the
performance, increasing the chance of network error, and wasting precise
ephemeral TCP ports (can only have 32K of them on one host) on client side.

Fix this by pooling the connections, decreasing the open socket count from 52K
to 15K during peak, and have consistently lower socket usage during all time
(see image attached).

Also set a connect time out and increase the error duration so that connection
error can be retried.
  • Loading branch information
Yuhta authored and spershin committed Nov 14, 2023
1 parent 1e2a5ae commit 279cd59
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ PeriodicServiceInventoryManager::PeriodicServiceInventoryManager(

void PeriodicServiceInventoryManager::start() {
eventBaseThread_.start(id_);
sessionPool_ = std::make_unique<proxygen::SessionPool>(nullptr, 10);
stopped_ = false;
auto* eventBase = eventBaseThread_.getEventBase();
eventBase->runOnDestruction([this] { client_.reset(); });
eventBase->runOnDestruction([this] { sessionPool_.reset(); });
eventBase->schedule([this]() { return sendRequest(); });
}

Expand Down Expand Up @@ -66,6 +67,7 @@ void PeriodicServiceInventoryManager::sendRequest() {
std::swap(serviceAddress_, newAddress);
client_ = std::make_shared<http::HttpClient>(
eventBaseThread_.getEventBase(),
sessionPool_.get(),
serviceAddress_,
std::chrono::milliseconds(10'000),
std::chrono::milliseconds(0),
Expand Down Expand Up @@ -137,4 +139,4 @@ void PeriodicServiceInventoryManager::scheduleNext() {
std::chrono::steady_clock::now() +
std::chrono::milliseconds(getDelayMs()));
}
} // namespace facebook::presto
} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class PeriodicServiceInventoryManager {
const double backOffjitterParam_{0.1};

folly::EventBaseThread eventBaseThread_;
std::unique_ptr<proxygen::SessionPool> sessionPool_;
folly::SocketAddress serviceAddress_;
std::shared_ptr<http::HttpClient> client_;
std::atomic_bool stopped_{true};
Expand Down
94 changes: 68 additions & 26 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ PrestoExchangeSource::PrestoExchangeSource(
const std::shared_ptr<exec::ExchangeQueue>& queue,
memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::IOThreadPoolExecutor* httpExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
const std::string& clientCertAndKeyPath,
const std::string& ciphers)
: ExchangeSource(extractTaskId(baseUri.path()), destination, queue, pool),
Expand All @@ -85,8 +86,7 @@ PrestoExchangeSource::PrestoExchangeSource(
ciphers_(ciphers),
immediateBufferTransfer_(
SystemConfig::instance()->exchangeImmediateBufferTransfer()),
driverExecutor_(driverExecutor),
httpExecutor_(httpExecutor) {
driverExecutor_(driverExecutor) {
folly::SocketAddress address;
if (folly::IPAddress::validate(host_)) {
address = folly::SocketAddress(folly::IPAddress(host_), port_);
Expand All @@ -100,11 +100,11 @@ PrestoExchangeSource::PrestoExchangeSource(
std::chrono::duration_cast<std::chrono::milliseconds>(
SystemConfig::instance()->exchangeConnectTimeoutMs());
VELOX_CHECK_NOT_NULL(driverExecutor_);
VELOX_CHECK_NOT_NULL(httpExecutor_);
VELOX_CHECK_NOT_NULL(ioEventBase);
VELOX_CHECK_NOT_NULL(pool_);
auto* ioEventBase = httpExecutor_->getEventBase();
httpClient_ = std::make_shared<http::HttpClient>(
ioEventBase,
sessionPool,
address,
requestTimeoutMs,
connectTimeoutMs,
Expand Down Expand Up @@ -338,24 +338,25 @@ void PrestoExchangeSource::processDataError(
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
const std::string& error,
bool retry) {
const std::string& error) {
++failedAttempts_;
if (retry && !dataRequestRetryState_.isExhausted()) {
if (!dataRequestRetryState_.isExhausted()) {
VLOG(1) << "Failed to fetch data from " << host_ << ":" << port_ << " "
<< path << " - Retrying: " << error;
<< path << ", duration: " << dataRequestRetryState_.durationMs()
<< "ms - Retrying: " << error;

doRequest(dataRequestRetryState_.nextDelayMs(), maxBytes, maxWaitSeconds);
return;
}

onFinalFailure(
fmt::format(
"Failed to fetch data from {}:{} {} - Exhausted after {} retries: {}",
"Failed to fetch data from {}:{} {} - Exhausted after {} retries, duration {}ms: {}",
host_,
port_,
path,
failedAttempts_,
dataRequestRetryState_.durationMs(),
error),
queue_);

Expand Down Expand Up @@ -457,34 +458,75 @@ std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::getSelfPtr() {
return std::dynamic_pointer_cast<PrestoExchangeSource>(shared_from_this());
}

const ConnectionPool& ConnectionPools::get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor) {
return *pools_.withULockPtr([&](auto ulock) -> const ConnectionPool* {
auto it = ulock->find(endpoint);
if (it != ulock->end()) {
return it->second.get();
}
auto wlock = ulock.moveFromUpgradeToWrite();
auto& pool = (*wlock)[endpoint];
if (!pool) {
pool = std::make_unique<ConnectionPool>();
pool->eventBase = ioExecutor->getEventBase();
pool->sessionPool = std::make_unique<proxygen::SessionPool>(nullptr, 10);
// Creation of the timer is not thread safe, so we do it here instead of
// in the constructor of HttpClient.
pool->eventBase->timer();
}
return pool.get();
});
}

void ConnectionPools::destroy() {
pools_.withWLock([](auto& pools) {
for (auto& [_, pool] : pools) {
pool->eventBase->runInEventBaseThread(
[sessionPool = std::move(pool->sessionPool)] {});
}
pools.clear();
});
}

// static
std::shared_ptr<exec::ExchangeSource> PrestoExchangeSource::create(
std::shared_ptr<PrestoExchangeSource> PrestoExchangeSource::create(
const std::string& url,
int destination,
const std::shared_ptr<exec::ExchangeQueue>& queue,
memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::IOThreadPoolExecutor* httpExecutor) {
if (strncmp(url.c_str(), "http://", 7) == 0) {
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools& connectionPools) {
folly::Uri uri(url);
if (uri.scheme() == "http") {
proxygen::Endpoint ep(uri.host(), uri.port(), false);
auto& connPool = connectionPools.get(ep, ioExecutor);
return std::make_shared<PrestoExchangeSource>(
folly::Uri(url),
uri,
destination,
queue,
pool,
driverExecutor,
httpExecutor);
} else if (strncmp(url.c_str(), "https://", 8) == 0) {
const auto systemConfig = SystemConfig::instance();
memoryPool,
cpuExecutor,
connPool.eventBase,
connPool.sessionPool.get());
}
if (uri.scheme() == "https") {
proxygen::Endpoint ep(uri.host(), uri.port(), true);
auto& connPool = connectionPools.get(ep, ioExecutor);
const auto* systemConfig = SystemConfig::instance();
const auto clientCertAndKeyPath =
systemConfig->httpsClientCertAndKeyPath().value_or("");
const auto ciphers = systemConfig->httpsSupportedCiphers();
return std::make_shared<PrestoExchangeSource>(
folly::Uri(url),
uri,
destination,
queue,
pool,
driverExecutor,
httpExecutor,
memoryPool,
cpuExecutor,
connPool.eventBase,
connPool.sessionPool.get(),
clientCertAndKeyPath,
ciphers);
}
Expand Down
53 changes: 45 additions & 8 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,38 @@

namespace facebook::presto {

// HTTP connection pool for a specific endpoint with its associated event base.
// All the operations on the SessionPool must be performed on the corresponding
// EventBase.
struct ConnectionPool {
folly::EventBase* eventBase;
std::unique_ptr<proxygen::SessionPool> sessionPool;
};

// Connection pools used by HTTP client in PrestoExchangeSource. It should be
// held living longer than all the PrestoExchangeSources and will be passed when
// we creating the exchange sources.
class ConnectionPools {
public:
~ConnectionPools() {
destroy();
}

const ConnectionPool& get(
const proxygen::Endpoint& endpoint,
folly::IOThreadPoolExecutor* ioExecutor);

void destroy();

private:
folly::Synchronized<folly::F14FastMap<
proxygen::Endpoint,
std::unique_ptr<ConnectionPool>,
proxygen::EndpointHash,
proxygen::EndpointEqual>>
pools_;
};

class PrestoExchangeSource : public velox::exec::ExchangeSource {
public:
class RetryState {
Expand All @@ -48,10 +80,14 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
.count();
}

int64_t durationMs() const {
return velox::getCurrentTimeMs() - startMs_;
}

// Returns whether we have exhausted all retries. We only retry if we spent
// less than maxWaitMs_ time after we first started.
bool isExhausted() const {
return velox::getCurrentTimeMs() - startMs_ > maxWaitMs_;
return durationMs() > maxWaitMs_;
}

private:
Expand All @@ -70,7 +106,8 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* pool,
folly::CPUThreadPoolExecutor* driverExecutor,
folly::IOThreadPoolExecutor* httpExecutor,
folly::EventBase* ioEventBase,
proxygen::SessionPool* sessionPool,
const std::string& clientCertAndKeyPath_ = "",
const std::string& ciphers_ = "");

Expand All @@ -96,13 +133,15 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
uint32_t maxBytes,
uint32_t maxWaitSeconds) override;

static std::shared_ptr<ExchangeSource> create(
// Create an exchange source using pooled connections.
static std::shared_ptr<PrestoExchangeSource> create(
const std::string& url,
int destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
velox::memory::MemoryPool* pool,
velox::memory::MemoryPool* memoryPool,
folly::CPUThreadPoolExecutor* cpuExecutor,
folly::IOThreadPoolExecutor* ioExecutor);
folly::IOThreadPoolExecutor* ioExecutor,
ConnectionPools& connectionPools);

/// Completes the future returned by 'request()' if it hasn't completed
/// already.
Expand Down Expand Up @@ -175,8 +214,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
const std::string& path,
uint32_t maxBytes,
uint32_t maxWaitSeconds,
const std::string& error,
bool retry = true);
const std::string& error);

void acknowledgeResults(int64_t ackSequence);

Expand Down Expand Up @@ -214,7 +252,6 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
const bool immediateBufferTransfer_;

folly::CPUThreadPoolExecutor* const driverExecutor_;
folly::IOThreadPoolExecutor* const httpExecutor_;

std::shared_ptr<http::HttpClient> httpClient_;
RetryState dataRequestRetryState_;
Expand Down
7 changes: 5 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "CoordinatorDiscoverer.h"
#include "presto_cpp/main/Announcer.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
Expand Down Expand Up @@ -328,7 +327,8 @@ void PrestoServer::run() {
queue,
pool,
driverExecutor_.get(),
exchangeHttpExecutor_.get());
exchangeHttpExecutor_.get(),
exchangeSourceConnectionPools_);
});

facebook::velox::exec::ExchangeSource::registerFactory(
Expand Down Expand Up @@ -452,6 +452,9 @@ void PrestoServer::run() {

unregisterConnectors();

PRESTO_SHUTDOWN_LOG(INFO) << "Releasing HTTP connection pools";
exchangeSourceConnectionPools_.destroy();

PRESTO_SHUTDOWN_LOG(INFO)
<< "Joining driver CPU Executor '" << driverExecutor_->getName()
<< "': threads: " << driverExecutor_->numActiveThreads() << "/"
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "presto_cpp/main/CPUMon.h"
#include "presto_cpp/main/CoordinatorDiscoverer.h"
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServerOperations.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
Expand Down Expand Up @@ -188,6 +189,8 @@ class PrestoServer {
// Executor for spilling.
std::shared_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;

ConnectionPools exchangeSourceConnectionPools_;

// Instance of MemoryAllocator used for all query memory allocations.
std::shared_ptr<velox::memory::MemoryAllocator> allocator_;

Expand Down
Loading

0 comments on commit 279cd59

Please sign in to comment.